SpringBoot优雅的封装不同研发环境下(环境隔离)RocketMq自动ack和手动ack

devtools/2024/9/25 9:34:26/

1. RocketMq的maven依赖版本:

     <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version></dependency>

2.RocketMq的yml文件:

# 自定义属性
system:environment:# 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串name: dev# 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果# 默认为true,配置类:EnvironmentIsolationConfigisolation: true
rocketmq:# 多个NameServer,host:port;host:port,RocketMQPropertiesnameServer: 你的NameServerproducer:# 发o送同一类消息的设置为同一个grup,保证唯一group: logistics_group# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false# access-keyaccessKey: 你的access-key# secret-keysecretKey: 你的secret-key# 是否启用消息跟踪,默认falseenableMsgTrace: false# 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称customizedTraceTopic: RMQ_SYS_TRACE_TOPICconsumer:# 指定消费组group: logistics_group#广播消费模式 CLUSTERING(集群消费)、BROADCASTING(广播消费)messageModel: CLUSTERING#设置消费超时时间(分钟)consumeTimeout: 1# 最大重试次数,默认16maxReconsumeTimes: 3# 其他配置参考属性类

3 IsolationConfig读取yml文件配置

package com.logistics.common.rocketMq.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;/*** @author: 吴顺杰* @create: 2024-06-18 10:01* @Description:* RocketMQ多环境隔离配置* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉*/
@Configuration
@Data
public class IsolationConfig {@Value("${system.environment.isolation:true}")private boolean enabledIsolation;@Value("${system.environment.name:''}")private String environmentName;@Value("${rocketmq.nameServer:''}")private String nameServer;@Value("${rocketmq.consumer.group:''}")private String group;@Value("${rocketmq.consumer.messageModel:''}")private String messageModel;@Value("${rocketmq.consumer.consumeTimeout:''}")private int consumeTimeout;@Value("${rocketmq.consumer.maxReconsumeTimes:''}")private int maxReconsumeTimes;
}

4.RocketMQ序列化器处理RocketMqConfig文件

主要是为了解决RocketMQ Jackson不支持Java时间类型配置

package com.logistics.common.rocketMq.config;import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;import java.util.List;/*** RocketMQ序列化器处理** @author 吴顺杰* @since 2024/8/04*/
@Configuration
public class RocketMqConfig {/*** 解决RocketMQ Jackson不支持Java时间类型配置*/@Bean@Primarypublic RocketMQMessageConverter createRocketMQMessageConverter() {RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();for (MessageConverter messageConverter : messageConverterList) {if (messageConverter instanceof MappingJackson2MessageConverter) {MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();// 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTimeobjectMapper.registerModules(new JavaTimeModule());}}return converter;}
}

JSON工具类封装

package com.logistics.common.rocketMq.utils;import com.alibaba.fastjson.JSONObject;/*** JSON工具类* 像工具类这种,建议一定要二次封装,避免出现漏洞时可以快速替换** @author 吴顺杰* @since 2024/6/16*/
public class JsonUtil {private JsonUtil() {}public static String toJson(Object value) {return JSONObject.toJSONString(value);}public static <T> T toObject(String jsonStr, Class<T> clazz) {return JSONObject.parseObject(jsonStr, clazz);}
}

5.rocketMq生产者封装

调度任务生产者:LogisticsAddDispatchMqProducer

package com.logistics.business.rocketMq.producer;import com.logistics.common.exception.base.BaseException;
import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.template.RocketMqTemplate;
import com.logistics.common.utils.spring.SpringUtils;
import lombok.extern.slf4j.Slf4j;/*** 新增调度任务生产者MQ队列*/
@Slf4j
public class LogisticsAddDispatchMqProducer {private static RocketMqTemplate rocketMqTemplate = SpringUtils.getBean(RocketMqTemplate.class);public static void sendAddDispatchMqMessage(AddDispatchMqMessage message) {log.info("新增调度任务成功发送新增调度消息MQ,内容: {}", message);try {rocketMqTemplate.asyncSend(AddDispatchMqContant.ADD_DISPATCH_TOPIC, AddDispatchMqContant.ADD_DISPATCH_TAG, message);} catch (Exception e) {log.error("新增调度任务成功发送新增调度消息MQ失败,内容: {}", message, e);throw new BaseException("新增调度任务成功发送新增调度消息MQ失败,请联系管理员");}}
}

RocketMQ模板类

package com.logistics.common.rocketMq.template;import com.alibaba.fastjson.JSONObject;
import com.logistics.common.rocketMq.config.IsolationConfig;
import com.logistics.common.rocketMq.constant.RocketMqSysConstant;
import com.logistics.common.rocketMq.domain.BaseMqMessage;
import com.logistics.common.rocketMq.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;/*** RocketMQ模板类**/
@Component
@Slf4j
public class RocketMqTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);@Resource(name = "rocketMQTemplate")private RocketMQTemplate template;@Resourceprivate IsolationConfig isolationConfig;/*** 获取模板,如果封装的方法不够提供原生的使用方式*/public RocketMQTemplate getTemplate() {return template;}/*** 构建目的地*/public String buildDestination(String topic, String tag) {return topic + RocketMqSysConstant.DELIMITER + tag;}/*** 发送同步消息*/public <T extends BaseMqMessage> SendResult syncSend(String topic, String tag, T message) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}// 设置业务键,此处根据公共的参数进行处理Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String buildDestination = buildDestination(topic, tag);SendResult sendResult = template.syncSend(buildDestination, sendMessage);// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集LOGGER.info("[{}]同步消息[{}]发送结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));return sendResult;}/*** 发送异步消息** @param topic* @param tag* @param message* @param <T>*/public <T extends BaseMqMessage> void asyncSend(String topic, String tag, T message) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}// 设置业务键,此处根据公共的参数进行处理Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String buildDestination = buildDestination(topic, tag);template.asyncSend(buildDestination, sendMessage, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOGGER.info("[{}]MQ异步消息[{}]发送成功结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));if (sendResult.getSendStatus() != SendStatus.SEND_OK) {//可以存入数据库做处理log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());}}@Overridepublic void onException(Throwable throwable) {LOGGER.info("[{}]MQ异步消息[{}]发送失败结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(throwable.getMessage()));//可以存入数据库做处理}});}/*** 发送延迟消息** @param message* @param delayLevel* @param <T>* @return*/public <T extends BaseMqMessage> SendResult syncDelaySend(String topic, String tag, T message, int delayLevel) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String destination = buildDestination(topic, tag);SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));return sendResult;}}

AddDispatchMqMessage消息实体

package com.logistics.common.rocketMq.domain;import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 新增调度任务发送队列消息体*/
@Data
@Builder
@NoArgsConstructor
public class AddDispatchMqMessage extends BaseMqMessage {/*** 调度id*/private Long dispatchId;public AddDispatchMqMessage(Long dispatchId) {this.dispatchId = dispatchId;}
}

AddDispatchMqContant类

package com.logistics.common.rocketMq.constant;/*** 新增调度任务MQ队列*/
public class AddDispatchMqContant {/*** 消费主题*/public static final String ADD_DISPATCH_TOPIC = "add_dispatch_topic";/*** 消费标签*/public static final String ADD_DISPATCH_TAG = "add_dispatch_tag";/*** 消费组*/public static final String ADD_DISPATCH_GROUP = "add_dispatch_group";
}

6.rocketMq消费者封装

ACK简介
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?

RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

1.手动ack封装

新增增调度任务消费者启动监听类

package com.logistics.business.rocketMq.listener;import com.logistics.business.rocketMq.comsumer.LogisticsAddDispatchMqComsumer;
import com.logistics.common.rocketMq.config.IsolationConfig;
import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** 新增调度任务消费者启动监听类*/
@Component
@Slf4j
public class RetryLogisticsAddDispatchListener {@Resourceprivate IsolationConfig isolationConfig;@Resourceprivate LogisticsAddDispatchMqComsumer logisticsAddDispatchMqComsumer;private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();@PostConstructpublic void start() {try {//启动环境隔离String topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC;if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP + "_" + isolationConfig.getEnvironmentName());topic = topic + "_" + isolationConfig.getEnvironmentName();} else {consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP);}consumer.setNamesrvAddr(isolationConfig.getNameServer());//设置集群消费模式consumer.setMessageModel(MessageModel.valueOf(isolationConfig.getMessageModel()));//设置消费超时时间(分钟)consumer.setConsumeTimeout(isolationConfig.getConsumeTimeout());//最大重试次数consumer.setMaxReconsumeTimes(isolationConfig.getMaxReconsumeTimes());//订阅主题consumer.subscribe(topic, AddDispatchMqContant.ADD_DISPATCH_TAG);//注册消息监听器consumer.registerMessageListener(logisticsAddDispatchMqComsumer);//启动消费端consumer.start();log.info("新增调度任务消费者MQ监听队列启动成功");} catch (MQClientException e) {e.printStackTrace();}}
}

LogisticsAddDispatchMqComsumer注册消息监听器

package com.logistics.business.rocketMq.comsumer;import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.nio.charset.StandardCharsets;
import java.util.List;/*** 新增调度任务消费者MQ队列*/
@Slf4j
@Component
public class LogisticsAddDispatchMqComsumer implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt message = msgs.get(0);try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);AddDispatchMqMessage addDispatchMqMessage = JsonUtil.toObject(messageBody, AddDispatchMqMessage.class);System.out.println("messageId: " + message.getMsgId() + ",topic: " +message.getTopic() + ",addDispatchMqMessage: " + addDispatchMqMessage);System.out.println(1 / 0);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,mq的偏移量才会下移。也就是手动ack,也只有手动返回CONSUME_SUCCESS,消息体才会偏移。

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; mq会默认重试16次,每次执行间隔

不等。最长好像是2个多小时,具体多少自己看官方文档,一般线上环境设置重试五次失败就进入死信队列了,我这里设置的是重试三次

onsumer Started.
date=Fri Aug 05 14:08:52 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:08:52 CST 2022
ReconsumeTimes=0                        '第一次处理'date=Fri Aug 05 14:09:02 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:09:02 CST 2022
ReconsumeTimes=1                       '第2次处理 与第一次间隔10s'date=Fri Aug 05 14:09:33 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:09:33 CST 2022
ReconsumeTimes=2						'第3次处理 与第2次间隔20s'date=Fri Aug 05 14:10:33 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:10:33 CST 2022
ReconsumeTimes=3                       '第4次处理 与第3次间隔1m'

2.自动ack封装

BaseMqMessageListener封装

package com.zhjt.rocketmq.listener;import com.zhjt.rocketmq.constant.RocketMqSysConstant;
import com.zhjt.rocketmq.domain.BaseMqMessage;
import com.zhjt.rocketmq.template.RocketMqTemplate;
import com.zhjt.rocketmq.utils.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;/*** 抽象消息监听器,封装了所有公共处理业务,如* 1、基础日志记录* 2、异常处理* 3、消息重试* 4、警告通知* 5、....** @author 吴顺杰* @since 2024/6/17*/
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {/*** 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化*/protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Resourceprivate RocketMqTemplate rocketMqTemplate;/*** 消息者名称** @return 消费者名称*/protected abstract String consumerName();/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void overMaxRetryTimesMessage(T message);/*** 是否过滤消息,例如某些** @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean isFilter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常** @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean isThrowException();/*** 最大重试此处** @return 最大重试次数,默认10次*/protected int maxRetryTimes() {return 10;}/*** isRetry开启时,重新入队延迟时间** @return -1:立即入队重试*/protected int retryDelayLevel() {return -1;}/*** 由父类来完成基础的日志和调配,下面的只是提供一个思路*/public void dispatchMessage(T message) {MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());// 基础日志记录被父类处理了logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));if (isFilter(message)) {logger.info("消息不满足消费条件,已过滤");return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > maxRetryTimes()) {overMaxRetryTimesMessage(message);return;}try {long start = Instant.now().toEpochMilli();handleMessage(message);long end = Instant.now().toEpochMilli();logger.info("消息消费成功,耗时[{}ms]", (end - start));} catch (Exception e) {logger.error("消息消费异常", e);// 是捕获异常还是抛出,由子类决定if (isThrowException()) {throw new RuntimeException(e);}if (isRetry()) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (Objects.nonNull(annotation)) {message.setSource(message.getSource() + "消息重试");message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());} catch (Exception ex) {throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}}}
}

IsolationConfigNew文件里面的方法:

package com.zhjt.rocketmq.config;/*** @author: 吴顺杰* @create: 2024-06-18 10:01* @Description:*/import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;/*** RocketMQ多环境隔离配置* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉** @author tianxincoord@163.com* @since 2022/5/18*/
@Configuration
public class IsolationConfigNew implements BeanPostProcessor {@Value("${system.environment.isolation:true}")private boolean enabledIsolation;@Value("${system.environment.name:''}")private String environmentName;@Overridepublic Object postProcessBeforeInitialization(@NonNull Object bean,@NonNull String beanName) throws BeansException {// DefaultRocketMQListenerContainer是监听器实现类if (bean instanceof DefaultRocketMQListenerContainer) {DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (enabledIsolation && StringUtils.hasText(environmentName)) {container.setTopic(String.join("_", container.getTopic(), environmentName));container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));}return container;}return bean;}
}

消费者监听实现LogisticsAddDispatchMqComsumer2

package com.logistics.business.rocketMq.comsumer;import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.listener.BaseMqMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 新增调度任务消费者MQ队列*/
@Slf4j
@Component
@RocketMQMessageListener(topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC,consumerGroup = AddDispatchMqContant.ADD_DISPATCH_GROUP,selectorExpression = AddDispatchMqContant.ADD_DISPATCH_GROUP,// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小consumeThreadMax = 21
)
public class LogisticsAddDispatchMqComsumer2 extends BaseMqMessageListener<AddDispatchMqMessage>implements RocketMQListener<AddDispatchMqMessage> {/*** 此处只是说明封装的思想,更多还是要根据业务操作决定* 内功心法有了,无论什么招式都可以发挥最大威力*/@Overrideprotected String consumerName() {return "RocketMq监听消息";}@Overridepublic void onMessage(AddDispatchMqMessage message) {// 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型super.dispatchMessage(message);}@Overrideprotected void handleMessage(AddDispatchMqMessage message) throws Exception {// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试// 业务异常直接抛出异常即可 否则捕获异常没有抛出 无法进行重试log.info("RocketMq监消息消费数据:{}", message);}@Overrideprotected void overMaxRetryTimesMessage(AddDispatchMqMessage message) {// 当超过指定重试次数消息时此处方法会被调用// 生产中可以进行回退或其他业务操作}@Overrideprotected boolean isRetry() {return true;}@Overrideprotected int maxRetryTimes() {// 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用return 5;}@Overrideprotected boolean isThrowException() {// 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常return false;}
}


http://www.ppmy.cn/devtools/98702.html

相关文章

Unity+Addressable

前期准备 下载一个hfs本地服务器&#xff0c;打开即可 HFS ~ HTTP 文件服务器 (rejetto.com) 1.安装Addressable插件 创建组 2.使用图片创建预制体 放入Addressable Groups内 3.右键 新建组 创建预制体t拖拽放入新建组里 新组命名为Gameobject 简化名称 4.创建一个测试脚本 …

【git bash编码错误解决方案】启动conda环境时报错,其他terminal却正常

&#x1f50e;嘿&#xff0c;这里是慰慰&#x1f469;&#x1f3fb;‍&#x1f393;&#xff0c;会发各种类型的文章&#xff0c;智能专业&#xff0c;从事前端&#x1f43e; &#x1f389;如果有帮助的话&#xff0c;就点个赞叭&#xff0c;让我开心一下&#xff01;&#x1f…

Python将Word文档转为PDF

使用python将word转pdf_py work转pdf-CSDN博客 掌握Python技巧&#xff1a;PDF文件的加密和水印处理-CSDN博客

UE5用蓝图实现物体A始终朝向物体B |Find Look at Rotation|

非常常用的蓝图节点 |Find Look at Rotation|&#xff1a;获取 物体A 到 物体B 的Rotator。 Tick中将算出的Rotator设置给物体A&#xff0c;即可实现永远朝向物体B

2024/8/25周报

摘要 Abstract 多目标优化算法 多目标优化&#xff08;Multi-Objective Optimization, MOO&#xff09;是优化领域的一个分支&#xff0c;它处理的是同时优化多个相互冲突的目标函数的问题。在实际应用中&#xff0c;很少有决策问题只涉及单一目标&#xff0c;通常需要在多个…

SolidityFoundry Merkle Airdrop

Merkle airdrop Merkle Tree&#xff0c;也叫默克尔树或哈希树&#xff0c;是区块链的底层加密技术&#xff0c;被比特币和以太坊区块链广泛采用。Merkle Tree允许对大型数据结构的内容进行有效和安全的验证&#xff08;Merkle Proof&#xff09;。对于有N个叶子结点的Merkle T…

在node.js环境中使用web服务器http-server运行html静态文件

http-server http-server是一个超轻量级web服务器&#xff0c;它可以将任何一个文件夹当作服务器的目录供自己使用。 当我们想要在服务器运行一些代码&#xff0c;但是又不会配置服务器的时候&#xff0c;就可以使用http-server就可以搞定了。 使用方法 因为http-server需要…

JAVA之MAC详解以及子线程MDC传递

MDC简介 MDC(Mapped Diagnostic Context)是用于分布式系统中跟踪和诊断日志的重要概念。是一个在Java项目中用于日志跟踪的工具&#xff0c;它允许你在多线程环境下关联和传递特定的上下文信息。 MDC是一个线程本地的、可维护的、可传递的上下文环境。在Java中&#xff0c;MDC…