RabbitMQ可靠性消息发送(java实现)

news/2024/11/23 9:38:18/

本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客

一、整体架构

step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性;

step2:Producer发送消息到MQ Broker;

step3:Producer收到 broker 返回的确认消息;

step4:更改消息记录库的状态(定义三种状态:0待确认、1已确认、2确认失败);

step5:定时任务获取长时间处于待确认状态的消息;

step6:Producer重试发送消息;

step7:重试次数超过3次,将消息状态更新为确认失败,后续根据具体业务再处理确认失败的消息;

二、消息记录的增删改查

1. 当前项目名为 rabbit-core-producer,为了实现消息记录入库,需要跟数据库打交道,这里首先添加依赖:

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.1.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency> 

 2. 消息记录的建表语句 rabbit-producer-message-schema.sql

-- 表 broker_message.broker_message 结构
CREATE TABLE IF NOT EXISTS `broker_message` (`message_id` varchar(128) NOT NULL,`message` varchar(4000),`try_count` int(4) DEFAULT 0,`status` varchar(10) DEFAULT '',`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将 rabbit-producer-message-schema.sql 放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer项目 在 RabbitMQ基础组件封装—整体结构 有具体说明(当前博客是 RabbitMQ基础组件封装—整体结构 的其中一个章节)。

3. 数据源的配置文件 rabbit-producer-message.properties

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=root
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

同样需要将该文件放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message.properties。

因为上面配置中有用到数据库 broker_message,所以需要自己提前建好一个数据库 broker_message。

4. BrokerMessage.java

public class BrokerMessage implements Serializable {private static final long serialVersionUID = 7447792462810110841L;private String messageId;private Message message;private Integer tryCount = 0;private String status;private Date nextRetry;private Date createTime;private Date updateTime;// getter、setter方法省略
}

5. BrokerMessageMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" ><resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" ><id column="message_id" property="messageId" jdbcType="VARCHAR" /><result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" /><result column="try_count" property="tryCount" jdbcType="INTEGER" /><result column="status" property="status" jdbcType="VARCHAR" /><result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" /><result column="create_time" property="createTime" jdbcType="TIMESTAMP" /><result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /></resultMap><sql id="Base_Column_List" >message_id, message, try_count, status, next_retry, create_time, update_time</sql><select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >select <include refid="Base_Column_List" />from broker_messagewhere message_id = #{messageId,jdbcType=VARCHAR}</select><delete id="deleteByPrimaryKey" parameterType="java.lang.String" >delete from broker_messagewhere message_id = #{messageId,jdbcType=VARCHAR}</delete><insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >insert into broker_message (message_id, message, try_count, status, next_retry, create_time, update_time)values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER}, #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP})</insert><insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >insert into broker_message<trim prefix="(" suffix=")" suffixOverrides="," ><if test="messageId != null" >message_id,</if><if test="message != null" >message,</if><if test="tryCount != null" >try_count,</if><if test="status != null" >status,</if><if test="nextRetry != null" >next_retry,</if><if test="createTime != null" >create_time,</if><if test="updateTime != null" >update_time,</if></trim><trim prefix="values (" suffix=")" suffixOverrides="," ><if test="messageId != null" >#{messageId,jdbcType=VARCHAR},</if><if test="message != null" >#{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},</if><if test="tryCount != null" >#{tryCount,jdbcType=INTEGER},</if><if test="status != null" >#{status,jdbcType=VARCHAR},</if><if test="nextRetry != null" >#{nextRetry,jdbcType=TIMESTAMP},</if><if test="createTime != null" >#{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null" >#{updateTime,jdbcType=TIMESTAMP},</if></trim></insert><update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >update broker_message<set ><if test="message != null" >message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},</if><if test="tryCount != null" >try_count = #{tryCount,jdbcType=INTEGER},</if><if test="status != null" >status = #{status,jdbcType=VARCHAR},</if><if test="nextRetry != null" >next_retry = #{nextRetry,jdbcType=TIMESTAMP},</if><if test="createTime != null" >create_time = #{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null" >update_time = #{updateTime,jdbcType=TIMESTAMP},</if></set>where message_id = #{messageId,jdbcType=VARCHAR}</update><update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >update broker_messageset message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},try_count = #{tryCount,jdbcType=INTEGER},status = #{status,jdbcType=VARCHAR},next_retry = #{nextRetry,jdbcType=TIMESTAMP},create_time = #{createTime,jdbcType=TIMESTAMP},update_time = #{updateTime,jdbcType=TIMESTAMP}where message_id = #{messageId,jdbcType=VARCHAR}</update><update id="changeBrokerMessageStatus" >update broker_message bmset bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},bm.update_time = #{updateTime, jdbcType=TIMESTAMP}where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}  </update><select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" ><![CDATA[select message_id, message, try_count, status, next_retry, create_time, update_timefrom broker_message bmwhere bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	and bm.next_retry < sysdate()]]></select><select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >select message_id, message, try_count, status, next_retry, create_time, update_timefrom broker_message bmwhere bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	</select><update id="update4TryCount" >update broker_message bmset bm.try_count = bm.try_count + 1,bm.update_time = #{updateTime,jdbcType=TIMESTAMP}where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}</update></mapper>

6. BrokerMessageMapper.java

@Mapper
public interface BrokerMessageMapper {int deleteByPrimaryKey(String messageId);int insert(BrokerMessage record);int insertSelective(BrokerMessage record);BrokerMessage selectByPrimaryKey(String messageId);int updateByPrimaryKeySelective(BrokerMessage record);int updateByPrimaryKeyWithBLOBs(BrokerMessage record);int updateByPrimaryKey(BrokerMessage record);void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);}

7. MessageStoreService.java(这里不加接口类了,直接在MessageStoreService.java中写具体逻辑实现)

@Service
public class MessageStoreService {@Autowiredprivate BrokerMessageMapper brokerMessageMapper;public int insert(BrokerMessage brokerMessage) {return this.brokerMessageMapper.insert(brokerMessage);}public BrokerMessage selectByMessageId(String messageId) {return this.brokerMessageMapper.selectByPrimaryKey(messageId);}public void succuess(String messageId) {this.brokerMessageMapper.changeBrokerMessageStatus(messageId,BrokerMessageStatus.SEND_OK.getCode(),new Date());}public void failure(String messageId) {this.brokerMessageMapper.changeBrokerMessageStatus(messageId,BrokerMessageStatus.SEND_FAIL.getCode(),new Date());}public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());}public int updateTryCount(String brokerMessageId) {return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());}
}

三、整合数据源

1. 读取配置文件,生成数据源,RabbitProducerDataSourceConfiguration.java

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);@Value("${rabbit.producer.druid.type}")private Class<? extends DataSource> dataSourceType;@Bean(name = "rabbitProducerDataSource")@Primary// 以这个rabbit.producer.druid.jdbc为前缀的属性值都会注入到DataSource中@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")public DataSource rabbitProducerDataSource() throws SQLException {DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);return rabbitProducerDataSource;}public DataSourceProperties primaryDataSourceProperties(){return new DataSourceProperties();}public DataSource primaryDataSource(){return primaryDataSourceProperties().initializeDataSourceBuilder().build();}}

2. 执行指定的sql脚本 ,BrokerMessageConfiguration.java

/*** 	$BrokerMessageConfiguration * 	帮我执行SQL脚本* 	帮我进行数据库表结构的创建**/
@Configuration
public class BrokerMessageConfiguration {@Autowiredprivate DataSource rabbitProducerDataSource;/*** 加载 rabbit-producer-message-schema.sql 脚本(这是一个建表语句)*/@Value("classpath:rabbit-producer-message-schema.sql")private Resource schemaScript;@Beanpublic DataSourceInitializer initDataSourceInitializer() {System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);final DataSourceInitializer initializer = new DataSourceInitializer();// 设置之前生成的数据源initializer.setDataSource(rabbitProducerDataSource);// 执行指定的sql脚本initializer.setDatabasePopulator(databasePopulator());return initializer;}/*** 执行指定的sql脚本* @return*/private DatabasePopulator databasePopulator() {final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();populator.addScript(schemaScript);return populator;}
}

3. 接下来是和 Mybatis 配置相关的文件:RabbitProducerMyBatisConfiguration.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {@Resource(name= "rabbitProducerDataSource")private DataSource rabbitProducerDataSource;@Bean(name="rabbitProducerSqlSessionFactory")public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {SqlSessionFactoryBean bean = new SqlSessionFactoryBean();bean.setDataSource(rabbitProducerDataSource);ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();try {// mapper.xml文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));SqlSessionFactory sqlSessionFactory = bean.getObject();sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);return sqlSessionFactory;} catch (Exception e) {throw new RuntimeException(e);}}@Bean(name="rabbitProducerSqlSessionTemplate")public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {return new SqlSessionTemplate(sqlSessionFactory);}}

4. Mapper扫描配置相关的文件: RabbitProducerMybatisMapperScanerConfig.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {@Bean(name="rabbitProducerMapperScannerConfigurer")public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {// mapper.java文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");return mapperScannerConfigurer;}}

四、可靠性发送消息代码实现

/*** 	$RabbitBrokerImpl 真正的发送不同类型的消息实现类**/
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {@Autowiredprivate RabbitTemplateContainer rabbitTemplateContainer;@Autowiredprivate MessageStoreService messageStoreService;/*** 可靠性消息发送*/@Overridepublic void reliantSend(Message message) {message.setMessageType(MessageType.RELIANT);BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());if(bm == null) {//1. 把数据库的消息发送日志先记录好Date now = new Date();BrokerMessage brokerMessage = new BrokerMessage();brokerMessage.setMessageId(message.getMessageId());brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());//tryCount默认等于0 所以在最开始发送的时候不需要进行设置brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));brokerMessage.setCreateTime(now);brokerMessage.setUpdateTime(now);brokerMessage.setMessage(message);messageStoreService.insert(brokerMessage);			}//2. 执行真正的发送消息逻辑sendKernel(message);}@Overridepublic void rapidSend(Message message) {// 省略...}/*** 	$sendKernel 发送消息的核心方法 使用异步线程池进行发送消息* @param message*/private void sendKernel(Message message) {AsyncBaseQueue.submit((Runnable) () -> {CorrelationData correlationData =// 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中new CorrelationData(String.format("%s#%s#%s",message.getMessageId(),System.currentTimeMillis(),message.getMessageType()));String topic = message.getTopic();String routingKey = message.getRoutingKey();RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());			});}@Overridepublic void confirmSend(Message message) {// 省略...}@Overridepublic void sendMessages() {// 省略...}}

并且在回调函数中,也要添加相应的逻辑:

/*** 	$RabbitTemplateContainer池化封装* 	每一个topic 对应一个RabbitTemplate*	1.	提高发送的效率* 	2. 	可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则*/
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();private Splitter splitter = Splitter.on("#");private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;@Autowiredprivate ConnectionFactory connectionFactory;@Autowiredprivate MessageStoreService messageStoreService;public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {// 省略...}/*** 	无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 	具体的消息应答List<String> strings = splitter.splitToList(correlationData.getId());String messageId = strings.get(0);long sendTime = Long.parseLong(strings.get(1));String messageType = strings.get(2);if(ack) {//	当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK// 	如果当前消息类型为reliant 我们就去数据库查找并进行更新if(MessageType.RELIANT.endsWith(messageType)) {this.messageStoreService.succuess(messageId);}log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);} else {log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);}}
}

上面大部分代码都是在实现迅速类型的消息发送时已经编写了,只是在 confirm()方法中添加了:

            // 	如果当前消息类型为reliant 我们就去数据库查找并进行更新if(MessageType.RELIANT.endsWith(messageType)) {this.messageStoreService.succuess(messageId);}

五、定时任务获取长时间处于待确认状态的消息并重新发送

1. 实现分布式定时任务

这里的定时任务是使用 ElasticJob,并对其进行封装,封装在项目 rabbit-task中,封装成为了两个注解 @EnableElasticJob 和 @ElasticJobConfig 。

具体的 ElasticJob 的使用和封装过程可参考教程:ElasticJob使用与封装

2.  将封装好的项目 rabbit-task 添加到 当前项目中并使用

(1)引入 rabbit-task 的依赖

        <dependency><groupId>com.bfxy.base.rabbit</groupId><artifactId>rabbit-task</artifactId> <version>0.0.1-SNAPSHOT</version>			</dependency>

(2)使用注解@EnableElasticJob

在当前项目 rabbit-core-producer 中的 自动装配类 中添加注解 @EnableElasticJob,使得当 应用程序启动的时候,就能对 ZooKeeper注册中心进行初始化,以及 ElasticJob的定时任务解析类 ElasticJobConfParser 的初始化。

/*** 	$RabbitProducerAutoConfiguration 自动装配 **/
@EnableElasticJob
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {}

(3)实现定时任务的具体处理逻辑并在类上加注解@EnableElasticJob

这里为了消息的可靠性发送,我们需要抓取 超时却仍处于待确认状态 的消息,进行重新发送消息。这里使用 ElasticJob 的流式定时任务 DataFlowJob。

@Component
@ElasticJobConfig(name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",cron= "0/10 * * * * ?",description = "可靠性投递消息补偿任务",overwrite = true,shardingTotalCount = 1)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{@Autowiredprivate MessageStoreService messageStoreService;@Autowiredprivate RabbitBroker rabbitBroker;private static final int MAX_RETRY_COUNT = 3;@Overridepublic List<BrokerMessage> fetchData(ShardingContext shardingContext) {// 抓取状态为未确认,而且 next_retry 小于当前时间的这些消息,为了确定百分百能发送成功,需要再进行重发List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);log.info("--------@@@@@ 抓取数据集合, 数量:	{} 	@@@@@@-----------" , list.size());return list;}@Overridepublic void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {dataList.forEach( brokerMessage -> {String messageId = brokerMessage.getMessageId();if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {// 重试次数大于3,就不再进行重发了,直接认为发送失败,更改标记为失败this.messageStoreService.failure(messageId);log.warn(" -----消息设置为最终失败,消息ID: {} -------", messageId);} else {//	每次重发的时候要更新一下try_count和next_retry字段this.messageStoreService.updateTryCount(messageId);// 	重发消息this.rabbitBroker.reliantSend(brokerMessage.getMessage());}});}
}

上面的代码中加入了注解

@ElasticJobConfig(name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",cron= "0/10 * * * * ?",description = "可靠性投递消息补偿任务",overwrite = true,shardingTotalCount = 1)

则该类中的逻辑会定时执行。

对于重发消息的代码 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已经做过说明,这里不再赘述。


http://www.ppmy.cn/news/62681.html

相关文章

Hudi的介绍与安装编译

Hudi的介绍 安装Maven 编译Hudi 执行编译 Hudi的介绍 Hudi简介 Hudi&#xff08;Hadoop Upserts Delete and Incremental&#xff09;是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取…

RSA加密为什么能保证安全

问题&#xff1a;我们都知道RSA加密是安全的&#xff0c;但是我们在使用的使用&#xff0c;怎么使用才能保证数据的安全传输呢&#xff1f; 一、原则&#xff1a;公钥机密、私钥解密、私钥签名、公钥验签 公钥私钥都可以加密和解密数据&#xff0c;但是因为持有公钥和私钥的人…

将webrtc的音频模式改为共享模式

修改音频设备模式:打开文件modules/audio_device/include/audio_device.h,将AudioDeviceModule::kPlatformDefaultAudioProcessing为true改为false。这将禁用默认的音频处理,使得可以修改音频设备模式。 修改音频设备模式的初始化:打开文件modules/audio_device/audio_dev…

域适应 Domain adaption(1)

一、定义 1、无监督域自适应 经典机器学习假设训练集和测试集来自相同的分布。 然而&#xff0c;这个假设在现实世界的应用程序中可能并不总是成立&#xff0c;例如&#xff0c;数据来源不同。 这种情况下&#xff0c;域分布之间会存在差异&#xff0c;直接将训练好的模型应…

UICollectionView 实现整页翻动(每页3个cell)

提示&#xff1a;页面架构是通过UICollectionView做的分页&#xff0c;分页点PageControl使用的是<SDCycleScrollView/TAPageControl.h> &#xff0c;布局架构使用的是Masonry 前言 为了实现UICollectionView无限翻动&#xff0c;连续滑动&#xff0c;主要是利用pagingE…

MySQL数据库的备份与恢复

一、数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中&#xff0c;数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因&#xff1a; 程序错误人为操作错误运算错误磁盘故障灾难&#xff08;如火灾、地震&#xff09;和盗窃 二、数据…

触控笔和电容笔哪个好用?ipad第三方电容笔了解下

和最早出现的那一些触控笔相比&#xff0c;现在电容笔最大的不同之处在于&#xff0c;这些电容笔具备了防误触、倾斜可以随意调整笔迹粗细的特性。苹果的Pencil&#xff0c;现在的价格&#xff0c;也是非常高的。所以&#xff0c;对于预算不足的人群来说&#xff0c;平替电容笔…

【Vue学习笔记3】使用Vite开启一个Vue3工程项目

1. 什么是Vite? Vite是一个web开发构建工具。Vite 的竞品是 Webpack&#xff0c;而且按照现在的趋势看&#xff0c;使用率超过 Webpack 也是早晚的事。 Vite 主要提升的是开发的体验&#xff0c;Webpack启动调试环境需要 3 分钟都很常见&#xff0c;Vite大大缩短了这个时间。…