接口 V2 完善:基于责任链模式、Canal 监听 Binlog 实现数据库、缓存的库存最终一致性

server/2025/1/24 3:41:36/

🎯 本文介绍了一种使用Canal监听MySQL Binlog实现数据库缓存最终一致性的方案。文章首先讲解了如何修改Canal配置以适应订单表和时间段表的变化,然后详细描述了通过责任链模式优化消息处理逻辑的方法,确保能够灵活应对不同数据表的更新需求。最后,展示了如何利用RocketMQ消费Canal消息并通过责任链处理器同步更新缓存,从而保证数据的一致性。此方法有效提升了系统的可扩展性和维护效率。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)

文章目录

  • 前言
  • 修改Canal配置
  • 修改 Canal 消息消费逻辑
    • 识别修改哪个数据表
    • 如何实现根据表执行相应操作

前言

在上一篇文章中,使用令牌限流方式来实现时间段的预定。时间段令牌和时间段库存缓存是分离的,因此需要额外对库存缓存进行更新,如何实现数据库缓存数据一致性是一个常见问题。本文使用 Canal 监听 Binlog 实现数据库缓存的库存最终一致性。为什么使用这种方案?不了解的读者可以先阅读文章:https://zhuanlan.zhihu.com/p/408515044

不了解 MySQL Binlog 开启和 Canal 安装与配置的朋友请先阅读小白手把手教程:https://hellodam.blog.csdn.net/article/details/144483823

Canal_10">修改Canal配置

修改instance.properties的过滤规则为canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1])|time_period_([0-9]|1[0-5]))$。现在不仅需要考虑订单表time_period_order,还要考虑时间段表time_period,因为现在要保证时间段库存和空闲场号的数据库缓存一致性

在这里插入图片描述

Canal__17">修改 Canal 消息消费逻辑

之前的实现如下

import cn.hutool.core.util.ObjectUtil;
import com.vrs.annotation.Idempotent;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {private final TimePeriodService timePeriodService;/*** 消费消息的方法* 方法报错就会拒收消息** @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "canal_binlog_common:",key = "#canalBinlogDTO.getId()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(CanalBinlogDTO canalBinlogDTO) {if (canalBinlogDTO.getOld() == null) {return;}Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {log.info("[消费者] 消费canal的消息,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));Long userId = Long.parseLong(alterDataMap.get("user_id").toString());Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Long courtIndex;if (alterDataMap.containsKey("partition_index")) {courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());} else {courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());}Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {// 恢复库存timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder().userId(userId).courtIndex(courtIndex).timePeriodId(timePeriodId).partitionId(partitionId).build());}}}
}

因为之前只需要处理订单表即可,现在还需要处理时间段表的更改。所以需要做两件事:

  • 识别是哪个数据表更新了
  • 根据所识别的表,执行相应的业务逻辑

识别修改哪个数据表

首先需要了解 Canal 发送的消息内容格式,从下图可以看到有数据库名,但没有表名。因此为了识别数据表,只能通过表的独有字段名来识别了

在这里插入图片描述

如何实现根据表执行相应操作

一种方式是,直接在onMessage方法中,识别完数据表类型之后,调用相应的方法来处理。这种方式实现简单,但后续如果要处理新的表,需要修改代码,违反了开闭原则。

为了解决这个问题,本文使用责任链模式,即封装多个处理器到责任链上,每个处理器负责识别一个表,并进行相应的业务逻辑。后续使用时就依次调用链上的处理器,如果处理器发现是自己的职责,就执行逻辑,否则直接返回,调用下一个处理器。

责任链模式框架

【抽象处理器】

package com.vrs.chain_of_responsibility;/*** @Author dam* @create 2024/12/11 19:18*/
public interface AbstractChainHandler<T> {/*** 由实现类来实现具体的处理方法*/boolean handle(T param);/*** 名称,用来区分不同的责任链*/String name();/*** 处理器的排序*/int order();
}
  • handler:由具体的处理器来实现,用来实现业务逻辑
  • name:用来标识责任链,返回相同名字的处理器被归到一个责任链中
  • order:用来给同一责任链的处理器排序

【责任链上下文】

该类用来管理不同的责任链。

  • 当Spring启动时,执行run方法,通过获取AbstractChainHandler的实现类来初始化所有责任链,即将处理器按照name划分到不同的责任链中,后面可以通过容器chainContainer来获取。最后对同一链上的处理器按照sort升序排序。
  • 当调用handler方法时,会根据name获取责任链,然后依次调用链上的处理器来进行业务处理,若有任意处理器的handle方法返回 true ,责任链就会中断。如果想要依次执行所有处理器,那所有处理器都返回 false 即可
package com.vrs.chain_of_responsibility;import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.*;/*** @Author dam* @create 2024/12/11 19:20*/
@Component
public class ChainContext<T> implements ApplicationContextAware, CommandLineRunner {/*** 通过 Spring IOC 获取 Bean 实例*/private ApplicationContext applicationContext;/*** key:责任链名称* value:责任链*/private final Map<String, List<AbstractChainHandler>> chainContainer = new HashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void run(String... args) {// 从 Spring IOC 容器中获取接口对应的 Spring Bean 集合Map<String, AbstractChainHandler> chainFilterMap = applicationContext.getBeansOfType(AbstractChainHandler.class);chainFilterMap.forEach((beanName, bean) -> {// 判断 name 是否已经存在抽象责任链容器中// 如果已经存在直接向集合新增// 如果不存在,创建对应的集合List<AbstractChainHandler> abstractChainHandlers = chainContainer.getOrDefault(bean.name(), new ArrayList<>());abstractChainHandlers.add(bean);chainContainer.put(bean.name(), abstractChainHandlers);});chainContainer.forEach((mark, unsortedChainHandlers) -> {// 对每个责任链的实现类根据order升序排序Collections.sort(unsortedChainHandlers, ((o1, o2) -> {return Integer.compare(o1.order(), o2.order());}));});}/*** 责任链组件执行** @param name         责任链组件标识* @param requestParam 请求参数*/public void handler(String name, T requestParam) {// 根据 name 从责任链容器中获取对应的责任链List<AbstractChainHandler> abstractChainHandlers = chainContainer.get(name);if (CollectionUtils.isEmpty(abstractChainHandlers)) {throw new RuntimeException(name + "对应的责任链不存在");}// 遍历责任链处理器for (AbstractChainHandler handler : abstractChainHandlers) {if (handler.handle(requestParam)) {// --if-- 如果处理器返回 true,表示已经处理完成,退出责任链return;}}}
}

常量

package com.vrs.constant;/*** Redis缓存Key常量类*/
public class ChainConstant {public static final String RESERVE_CHAIN_NAME = "reserve_chain";public static final String CANAL_CHAIN_NAME = "canal_chain";}

具体处理器

由于修改的要么是订单表,要么是时间段表,因此责任链上面只要有任一处理器成功处理,即返回 true ,就无须调用余下的其他处理器

【时间段库存修改处理器】

数据库中的库存修改之后,同步修改缓存中的库存

package com.vrs.service.chainHander.canal;import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.service.PartitionService;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.Map;/*** 订单超时关闭处理逻辑** @Author dam* @create 2024/12/11 19:43*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TimePeriodStockChangeHandler implements AbstractChainHandler<CanalBinlogDTO> {private final StringRedisTemplate redisTemplate;private final TimePeriodService timePeriodService;private final PartitionService partitionService;@Overridepublic boolean handle(CanalBinlogDTO canalBinlogDTO) {Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("stock")) {// --if-- 如果是修改操作,且修改了stocklog.info("[消费者] 消费canal的消息,时间段库存修改,同步修改缓存的库存,时间段ID:{}", alterDataMap.get("id"));Long timePeriodId = Long.parseLong(alterDataMap.get("id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Integer stock = Integer.parseInt(alterDataMap.get("stock").toString());Long bookedSlots = Long.parseLong(alterDataMap.get("booked_slots").toString());// 更新库存redisTemplate.opsForValue().set(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId), stock.toString());// 更新位图timePeriodService.initializeFreeIndexBitmap(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),partitionService.getPartitionDOById(partitionId).getNum(),bookedSlots,24 * 3600);return true;}return false;}@Overridepublic String name() {return ChainConstant.CANAL_CHAIN_NAME;}@Overridepublic int order() {return 10;}
}

【订单超时关闭处理器】

package com.vrs.service.chainHander.canal;import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.service.TimePeriodService;
import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;import java.util.Map;/*** 订单超时关闭处理逻辑** @Author dam* @create 2024/12/11 19:43*/
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseHandler implements AbstractChainHandler<CanalBinlogDTO> {private final TimePeriodService timePeriodService;@Overridepublic boolean handle(CanalBinlogDTO canalBinlogDTO) {Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {log.info("[消费者] 消费canal的消息,订单超时关闭,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));Long userId = Long.parseLong(alterDataMap.get("user_id").toString());Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Long courtIndex;if (alterDataMap.containsKey("partition_index")) {courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());} else {courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());}Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {// 恢复库存timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder().userId(userId).courtIndex(courtIndex).timePeriodId(timePeriodId).partitionId(partitionId).build());}return true;}return false;}@Overridepublic String name() {return ChainConstant.CANAL_CHAIN_NAME;}@Overridepublic int order() {return 0;}
}

MQ 消费者调用责任链

使用非常简单,直接调用chainContext.handler(ChainConstant.CANAL_CHAIN_NAME, canalBinlogDTO);即可

package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.chain_of_responsibility.ChainContext;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.enums.IdempotentSceneEnum;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {private final ChainContext chainContext;/*** 消费消息的方法* 方法报错就会拒收消息** @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "canal_binlog_common:",key = "#canalBinlogDTO.getId()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(CanalBinlogDTO canalBinlogDTO) {if (canalBinlogDTO.getOld() == null) {// --if-- 如果不是修改数据,快速退出,因为我们现在的业务逻辑都是识别出数据修改才有下面的操作return;}// 调用责任链来消费 canal 消息chainContext.handler(ChainConstant.CANAL_CHAIN_NAME, canalBinlogDTO);}
}

http://www.ppmy.cn/server/160923.html

相关文章

ssm-mybatisPlus学习笔记

注意&#xff01;mybatisPlus只能够进行单表操作&#xff0c;其他的仍需要mybatis 1.快速入门 编写启动类 MapperScan("com.atguigu.mapper") SpringBootApplication public class MainApplication {public static void main(String[] args) {SpringApplication.r…

从零搭建一套远程手机的桌面操控和文件传输的小工具

从零搭建一套远程手机的桌面操控和文件传输的小工具 --ADB连接专题 一、前言 前面的篇章中&#xff0c;我们确定了通过基于TCP连接的ADB控制远程手机的操作思路。本篇中我们将进行实际的ADB桥接的具体链路搭建工作&#xff0c;从原理和实际部署和操作层面上&#xff0c;从零…

Excel 实现文本拼接方法

1. 使用 & 运算符 这是最常见和简单的拼接方法。你只需使用 & 来连接多个文本单元格或文本字符串。 示例公式&#xff1a; A1 & B1这个公式会将 A1 和 B1 单元格中的文本合并为一个字符串。 如果你希望在文本之间加入分隔符&#xff08;如空格、逗号等&#xf…

verilog笔记1

1. 阻塞赋值 阻塞赋值&#xff0c;顾名思义即在一个 always 块中&#xff0c;后面的语句会受到前语句的影响&#xff0c;具体来说就是在同一个always 中&#xff0c;一条阻塞赋值语句如果没有执行结束&#xff0c;那么该语句后面的语句就不能被执行&#xff0c;即被“阻塞”。也…

Linux——线程条件变量(同步)

Linux——多线程的控制-CSDN博客 文章目录 目录 文章目录 前言 一、条件变量是什么&#xff1f; 1、死锁的必要条件 1. 互斥条件&#xff08;Mutual Exclusion&#xff09; 2. 请求和保持条件&#xff08;Hold and Wait&#xff09; 3. 不可剥夺条件&#xff08;No Preemption&…

Nginx 反向代理与负载均衡配置实践

一、引言 在当今互联网架构中&#xff0c;Nginx作为一款高性能的HTTP和反向代理服务器&#xff0c;广泛应用于各种场景&#xff0c;为众多网站和应用提供了强大的支持。它能够高效地处理大量并发请求&#xff0c;实现反向代理与负载均衡功能&#xff0c;显著提升系统的性能、可…

【vitePress】基于github快速添加评论功能(giscus)

一.添加评论插件 使用giscus来做vitepress 的评论模块&#xff0c;使用也非常的简单&#xff0c;具体可以参考&#xff1a;giscus 文档&#xff0c;首先安装giscus npm i giscus/vue 二.giscus操作 打开giscus 文档&#xff0c;如下图所示&#xff0c;填入你的 github 用户…

c++ vector

1. vector 构造函数 功能描述&#xff1a; vector 构造函数 函数原型 vector v; // 默认构造函数 vector(n, elem); // 将n个elem拷贝给本身 vector(const vector& v); // 拷贝构造函数 vector(v.begin(), v.end()); // 将v[begin(), end())区间中的元素拷贝给本身 #incl…