一、案例:首页广告缓存更新
缓存更新 基于canal 完成首页广告缓存数据更新
需求分析:当tb_ad(广告)表的数据发生变化时,更新redis中的广告数据。
二、首页广告缓存更新-实现步骤
(1)修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到rabbitmq
(2)从rabbitmq中提取消息,通过OkHttpClient调用ad_update来实现对广告缓存数据的更新。
三、首页广告缓存更新-代码实现
3.1 发送消息到MQ
步骤一:在rabbitmq管理后台创建队列 ad_update_queue ,用于接收广告更新通知
步骤二:引入rabbitmq起步依赖
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>
步骤三:配置文件application.properties 添加内容
spring.rabbitmq.host=192.168.200.128
步骤四:新增rabbitMQ配置类
@Configuration
public class RabbitMQConfig {
//定义队列名称public static final String AD_UPDATE_QUEUE="ad_update_queue";//声明队列@Beanpublic Queue queue(){return new Queue(AD_UPDATE_QUEUE);}
}
步骤五:修改BusinessListener类
@CanalEventListener //声明当前的类是canal的监听类
public class BusinessListener {
@Autowiredprivate RabbitTemplate rabbitTemplate;
/**** @param eventType 当前操作数据库的类型* @param rowData 当前操作数据库的数据*/@ListenPoint(schema = "shangcheng_business",table = "tb_ad")public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){System.out.println("广告表数据发生改变");
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {if ("position".equals(column.getName())){System.out.println("发送最新的数据到MQ:"+column.getValue());
//发送消息rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());}}}
}
步骤六:测试,运行数据监控微服务canal,新增、修改或删除tb_ad表数据,修改后观察控制台输出和rabbitmq管理界面中ad_update_queue是否接收到消息
3.2 从MQ中提取消息执行更新
步骤一:shangcheng_service_business工程pom.xml引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>3.9.0</version>
</dependency>
步骤二:在spring节点下添加rabbitmq配置
rabbitmq:host: 192.168.200.128
步骤三:com.shangcheng.business包下创建listener包,包下创建类
@Component
public class AdListener {
@RabbitListener(queues = "ad_update_queue")public void receiveMessage(String message){System.out.println("接收到的消息为:"+message);
//发起远程调用OkHttpClient okHttpClient = new OkHttpClient();String url = "http://192.168.200.128/ad_update?position="+message;Request request = new Request.Builder().url(url).build();Call call = okHttpClient.newCall(request);call.enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {//请求失败e.printStackTrace();}
@Overridepublic void onResponse(Call call, Response response) throws IOException {//请求成功System.out.println("请求成功:"+response.message());}});}
}
步骤四:测试,启动eureka和business微服务,观察控制台输出和数据同步效果。