一、导入相关依赖
在项目中引入MQ客户端依赖,依赖版本最好和RocketMQ版本一致。
<!-- rocket客户端--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency>
二、消息发送
首先介绍一下消息发送的大致流程,当我们调用消息发送方法时该方法会先对待发送消息进行前置验证,如果消息主题和消息内容均没有问题的话,就会根据消息主题(Topic)去获取路由信息,即消息主题对应的队列,broker,broker的ip和端口信息,然后选择一条队列发送消息,成功的话返回发送成功,失败的话会根据我们设置的重试次数进行重新发送,单向消息发送不会进行失败重试。
1、RocketMQ初始化
RocketMQ配置类案例:
package com.example.framework.mq.config;import cn.hutool.core.thread.ThreadUtil;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Map;
import java.util.concurrent.ExecutorService;/*** MQ配置*/
@Configuration
public class MQConfig {@Value("${spring.application.name:application}")private String groupName;//集群名称,这边以应用名称作为集群名称/***************************消息消费者***************************/@Autowiredprivate Map<String, MQHandler> mqHandlerMap;// 消费者nameservice地址@Value("${rocketmq.consumer.namesrvAddr:127.0.0.1:9876}")private String cNamesrvAddr;// 最小线程数@Value("${rocketmq.consumer.consumeThreadMin:20}")private int consumeThreadMin;// 最大线程数@Value("${rocketmq.consumer.consumeThreadMax:64}")private int consumeThreadMax;// 消费者监听主题,多个主题以分号隔开(topic~tag;topic~tag)@Value("${rocketmq.consumer.topics:test~*}")private String topics;// 一次消费消息的条数,默认为1条@Value("${rocketmq.consumer.consumeMessageBatchMaxSize:1}")private int consumeMessageBatchMaxSize;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(cNamesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.registerMessageListener(getMessageListenerConcurrently());// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费,如果非第一次启动,那么按照上次消费的位置继续消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 设置消费模型,集群还是广播,默认为集群//consumer.setMessageModel(MessageModel.CLUSTERING);// 设置一次消费消息的条数,默认为1条consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);try {// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3String[] topicTagsArr = topics.split(";");for (String topicTags : topicTagsArr) {String[] topicTag = topicTags.split("~");consumer.subscribe(topicTag[0],topicTag[1]);}consumer.start();}catch (Exception e){throw new Exception(e);}return consumer;}// 并发消息侦听器(如果对顺序消费有需求则使用MessageListenerOrderly 有序消息侦听器)@Beanpublic MessageListenerConcurrently getMessageListenerConcurrently() {return new MQListenerConcurrently(mqHandlerMap);}/***************************消息生产者***************************/// 事务消息监听器@Autowiredprivate MQTransactionListener mqTransactionListener;// 生产者nameservice地址@Value("${rocketmq.producer.namesrvAddr:127.0.0.1:9876}")private String pNamesrvAddr;// 消息最大大小,默认4M@Value("${rocketmq.producer.maxMessageSize:4096}")private Integer maxMessageSize ;// 消息发送超时时间,默认3秒@Value("${rocketmq.producer.sendMsgTimeout:30000}")private Integer sendMsgTimeout;// 消息发送失败重试次数,默认2次@Value("${rocketmq.producer.retryTimesWhenSendFailed:2}")private Integer retryTimesWhenSendFailed;// 执行任务的线程池private static ExecutorService executor = ThreadUtil.newExecutor(32);//普通消息生产者@Bean("default")public DefaultMQProducer getDefaultMQProducer() {DefaultMQProducer producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}//事务消息生产者(rocketmq支持柔性事务)@Bean("transaction")public TransactionMQProducer getTransactionMQProducer() {//初始化事务消息基本与普通消息生产者一致TransactionMQProducer producer = new TransactionMQProducer("transaction_" + this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);//添加事务消息处理线程池producer.setExecutorService(executor);//添加事务消息监听producer.setTransactionListener(mqTransactionListener);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}
}
2、发送消息
消息发送根据消息功能主要分为普通消息、事务消息、顺序消息、延时消息等!特别说明一下事务消息多用于保证多服务模块间的事务一致性,事务消息发送后并不会直接通知消费者消费消息,而是会先生成一个半消息,会先进入事务消息监听器中,确保该消息事务提交成功后才会向broker发送消息,从而被消费者获取并进行消费。
根据发送方式可以分为同步消息,异步消息和单向消息等:
- 同步消息常用于比较重要的消息发送,需要等待broker响应告知消息发送状态。
- 异步消息的话常用于对响应时间敏感,需要快速返回的模块,我们会设置一个回调代码块去异步监听Borker的响应。
- 单向消息的话主要用于对发送结果不敏感,不会影响业务的模块,无需监听broker响应,常用于日志发送等模块。
下面代码演示四种消息的使用方式:
package com.example.order.service.impl;import com.alibaba.fastjson.JSON;
import com.example.framework.utils.SonwflakeUtils;
import com.example.order.entity.Order;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;/*** 服务实现类*/
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {@Qualifier("default")@Autowiredprivate DefaultMQProducer producer;@Autowiredprivate TransactionMQProducer transactionMQProducer;/*** 添加订单(发送消息积分模块同步添加积分)* @param order 订单信息* @return org.apache.rocketmq.client.producer.TransactionSendResult**/@Overridepublic Order addOder(Order order) {order.setOrderId(SonwflakeUtils.get().id());if (order.getMessageType() == 1) {//普通消息this.save(order);Message message = new Message("points", "default", JSON.toJSONString(order).getBytes());try {//同步消息SendResult sendResult = producer.send(message);System.out.println("发送状态:" + sendResult.getSendStatus() +",消息ID:" + sendResult.getMsgId() +",队列:" + sendResult.getMessageQueue().getQueueId());
// producer.sendOneway(message);//单向消息
//----------------------------异步消息-----------------------------------
// producer.send(message, new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
//
// }
//
// @Override
// public void onException(Throwable throwable) {
//
// }
// });} catch (RemotingException | MQBrokerException | InterruptedException | MQClientException e) {e.printStackTrace();}} else {//事务消息Message message = new Message("points", "transaction", JSON.toJSONString(order).getBytes());try {transactionMQProducer.sendMessageInTransaction(message, null);} catch (MQClientException e) {e.printStackTrace();}}return order;}
}
3、消息消费
边对MessageListenerConcurrently有进行一定封装,主要是为了在消息处理时通过注解定位消息Topic和tag而自动选择对应的消息处理类进行业务处理;封装代码如下:
package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.Map;/*** 并发消息监听器*/
public class MQListenerConcurrently implements MessageListenerConcurrently {@Autowiredprivate Map<String, MQHandler> mqHandlerMap;public MQListenerConcurrently(Map<String, MQHandler> mqHandlerMap) {this.mqHandlerMap = mqHandlerMap;}@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if(CollectionUtils.isEmpty(list)){System.out.println("接受到的消息为空,不处理,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);// 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)// 获取该消息重试次数int reconsume = messageExt.getReconsumeTimes();if(reconsume ==3){//消息已经重试了3次,需做告警处理,已经相关日志return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理对应的业务逻辑String topic = messageExt.getTopic();String tags = messageExt.getTags();System.out.println("接受到的消息主题为:" + topic + "; tag为:" + tags);MQHandler mqMsgHandler = null;//获取消息处理类中的topic和tag注解,根据topic和tag进行策略分发出来具体业务for (Map.Entry<String, MQHandler> entry : mqHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer == null) {//非消息处理类continue;}String annotationTopic = msgHandlerActualizer.topic();if (!StrUtil.equals(topic,annotationTopic)) {//非该主题处理类continue;}String[] annotationTags = msgHandlerActualizer.tags();if(StrUtil.equals(annotationTags[0],"*")){//获取该实例mqMsgHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解类中包含tag则获取该实例mqMsgHandler = entry.getValue();break;}}if (mqMsgHandler == null) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}ConsumeConcurrentlyStatus status = mqMsgHandler.handle(tags,messageExt);// 如果没有return success,consumer会重新消费该消息,直到return successreturn status;}
}
事务消息监听器封装:
package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQTransactionHandler;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Arrays;
import java.util.Map;public class MQTransactionListener implements TransactionListener {@Autowiredprivate Map<String, MQTransactionHandler> mqTransactionHandlerMap;@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {MQTransactionHandler mqTransactionHandler = getListenner(message.getTopic(),message.getTags());return mqTransactionHandler.executeLocalTransaction(message,o);}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {MQTransactionHandler mqTransactionHandler = getListenner(messageExt.getTopic(),messageExt.getTags());return mqTransactionHandler.checkLocalTransaction(messageExt);}private MQTransactionHandler getListenner(String topic,String tags) {MQTransactionHandler mqTransactionHandler = null;for (Map.Entry<String, MQTransactionHandler> entry : mqTransactionHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer != null) {String annotationTopic = msgHandlerActualizer.topic();String[] annotationTags = msgHandlerActualizer.tags();if (!StrUtil.equals(topic,annotationTopic)) {//非该主题处理类continue;}if(StrUtil.equals(annotationTags[0],"*")){//获取该实例mqTransactionHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解类中包含tag则获取该实例mqTransactionHandler = entry.getValue();break;}}}return mqTransactionHandler;}
}
使用注解@MQHandlerActualizer标明该消息处理类的主题,默认监听所有tag,如果需要对tag监听进行分类,后面加上tag即可。消息监听器在收到消息后会自动调用主题对应的处理类进行业务处理,示例如下:
package com.example.points.mqHandler;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import com.example.framework.utils.SonwflakeUtils;
import com.example.points.entity.Points;
import com.example.points.service.PointsService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/***/
@MQHandlerActualizer(topic = "points")
public class PointsMQHandler implements MQHandler {@Autowiredprivate PointsService pointsService;@Overridepublic ConsumeConcurrentlyStatus handle(String tag, MessageExt messageExt) {//消息监听String messageStr = new String(messageExt.getBody());Map orderMap = (Map) JSON.parse(messageStr);Points points = new Points();Long orderId = (Long) orderMap.get("orderId");System.out.println("消息tag为:" + tag);System.out.println("消息监听:" + "为订单" + orderId + "添加积分");//查询该订单是否已经生成对应积分(rocketMQ可能会重复发送消息,需实现幂等)QueryWrapper<Points> pointsQueryWrapper = new QueryWrapper<>();pointsQueryWrapper.lambda().eq(Points::getOrderId,orderId);Points tempPoints = pointsService.getOne(pointsQueryWrapper);if (tempPoints != null) {//该订单已经生成积分System.out.println(orderId + "已经生成积分");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}points.setPointsId(SonwflakeUtils.get().id());points.setOrderId(orderId);Integer orderAmout = (Integer) orderMap.get("orderAmout");points.setPoints(orderAmout * 10);pointsService.save(points);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}
使用消息队列注意事项
- RocketMQ确保所有消息至少传递一次。虽然大多数情况下,消息不会重复,但还是需要对重复消息做。
- 尽量减小消息的体积,例如选择轻量的协议,超过一定体积做压缩处理,就消息协议而言, 二进制协议 < 文本协议。而文本协议中 json < xml 等等。