RoctetMQ使用(2):在项目中使用

embedded/2024/9/25 5:30:36/

一、导入相关依赖

        在项目中引入MQ客户端依赖,依赖版本最好和RocketMQ版本一致。

<!--            rocket客户端--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency>

二、消息发送

        首先介绍一下消息发送的大致流程,当我们调用消息发送方法时该方法会先对待发送消息进行前置验证,如果消息主题和消息内容均没有问题的话,就会根据消息主题(Topic)去获取路由信息,即消息主题对应的队列,broker,broker的ip和端口信息,然后选择一条队列发送消息,成功的话返回发送成功,失败的话会根据我们设置的重试次数进行重新发送,单向消息发送不会进行失败重试。

1、RocketMQ初始化

        RocketMQ配置类案例:

package com.example.framework.mq.config;import cn.hutool.core.thread.ThreadUtil;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Map;
import java.util.concurrent.ExecutorService;/*** MQ配置*/
@Configuration
public class MQConfig {@Value("${spring.application.name:application}")private String groupName;//集群名称,这边以应用名称作为集群名称/***************************消息消费者***************************/@Autowiredprivate Map<String, MQHandler> mqHandlerMap;// 消费者nameservice地址@Value("${rocketmq.consumer.namesrvAddr:127.0.0.1:9876}")private String cNamesrvAddr;// 最小线程数@Value("${rocketmq.consumer.consumeThreadMin:20}")private int consumeThreadMin;// 最大线程数@Value("${rocketmq.consumer.consumeThreadMax:64}")private int consumeThreadMax;// 消费者监听主题,多个主题以分号隔开(topic~tag;topic~tag)@Value("${rocketmq.consumer.topics:test~*}")private String topics;// 一次消费消息的条数,默认为1条@Value("${rocketmq.consumer.consumeMessageBatchMaxSize:1}")private int consumeMessageBatchMaxSize;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(cNamesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.registerMessageListener(getMessageListenerConcurrently());// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费,如果非第一次启动,那么按照上次消费的位置继续消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 设置消费模型,集群还是广播,默认为集群//consumer.setMessageModel(MessageModel.CLUSTERING);// 设置一次消费消息的条数,默认为1条consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);try {// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3String[] topicTagsArr = topics.split(";");for (String topicTags : topicTagsArr) {String[] topicTag = topicTags.split("~");consumer.subscribe(topicTag[0],topicTag[1]);}consumer.start();}catch (Exception e){throw new Exception(e);}return consumer;}// 并发消息侦听器(如果对顺序消费有需求则使用MessageListenerOrderly 有序消息侦听器)@Beanpublic MessageListenerConcurrently getMessageListenerConcurrently() {return new MQListenerConcurrently(mqHandlerMap);}/***************************消息生产者***************************/// 事务消息监听器@Autowiredprivate MQTransactionListener mqTransactionListener;// 生产者nameservice地址@Value("${rocketmq.producer.namesrvAddr:127.0.0.1:9876}")private String pNamesrvAddr;// 消息最大大小,默认4M@Value("${rocketmq.producer.maxMessageSize:4096}")private Integer maxMessageSize ;// 消息发送超时时间,默认3秒@Value("${rocketmq.producer.sendMsgTimeout:30000}")private Integer sendMsgTimeout;// 消息发送失败重试次数,默认2次@Value("${rocketmq.producer.retryTimesWhenSendFailed:2}")private Integer retryTimesWhenSendFailed;// 执行任务的线程池private static ExecutorService executor = ThreadUtil.newExecutor(32);//普通消息生产者@Bean("default")public DefaultMQProducer getDefaultMQProducer() {DefaultMQProducer producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}//事务消息生产者(rocketmq支持柔性事务)@Bean("transaction")public TransactionMQProducer getTransactionMQProducer() {//初始化事务消息基本与普通消息生产者一致TransactionMQProducer producer = new TransactionMQProducer("transaction_" + this.groupName);producer.setNamesrvAddr(this.pNamesrvAddr);producer.setMaxMessageSize(this.maxMessageSize);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);//添加事务消息处理线程池producer.setExecutorService(executor);//添加事务消息监听producer.setTransactionListener(mqTransactionListener);try {producer.start();} catch (MQClientException e) {System.out.println(e.getErrorMessage());}return producer;}
}

2、发送消息

        消息发送根据消息功能主要分为普通消息、事务消息、顺序消息、延时消息等!特别说明一下事务消息多用于保证多服务模块间的事务一致性,事务消息发送后并不会直接通知消费者消费消息,而是会先生成一个半消息,会先进入事务消息监听器中,确保该消息事务提交成功后才会向broker发送消息,从而被消费者获取并进行消费。

        根据发送方式可以分为同步消息,异步消息和单向消息等:

  • 同步消息常用于比较重要的消息发送,需要等待broker响应告知消息发送状态。
  • 异步消息的话常用于对响应时间敏感,需要快速返回的模块,我们会设置一个回调代码块去异步监听Borker的响应。
  • 单向消息的话主要用于对发送结果不敏感,不会影响业务的模块,无需监听broker响应,常用于日志发送等模块。

        下面代码演示四种消息的使用方式:

package com.example.order.service.impl;import com.alibaba.fastjson.JSON;
import com.example.framework.utils.SonwflakeUtils;
import com.example.order.entity.Order;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;/*** 服务实现类*/
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {@Qualifier("default")@Autowiredprivate DefaultMQProducer producer;@Autowiredprivate TransactionMQProducer transactionMQProducer;/*** 添加订单(发送消息积分模块同步添加积分)* @param order 订单信息* @return org.apache.rocketmq.client.producer.TransactionSendResult**/@Overridepublic Order addOder(Order order) {order.setOrderId(SonwflakeUtils.get().id());if (order.getMessageType() == 1) {//普通消息this.save(order);Message message = new Message("points", "default", JSON.toJSONString(order).getBytes());try {//同步消息SendResult sendResult = producer.send(message);System.out.println("发送状态:" + sendResult.getSendStatus() +",消息ID:" + sendResult.getMsgId() +",队列:" + sendResult.getMessageQueue().getQueueId());
//                producer.sendOneway(message);//单向消息
//----------------------------异步消息-----------------------------------
//                producer.send(message, new SendCallback() {
//                    @Override
//                    public void onSuccess(SendResult sendResult) {
//
//                    }
//
//                    @Override
//                    public void onException(Throwable throwable) {
//
//                    }
//                });} catch (RemotingException | MQBrokerException | InterruptedException | MQClientException e) {e.printStackTrace();}} else {//事务消息Message message = new Message("points", "transaction", JSON.toJSONString(order).getBytes());try {transactionMQProducer.sendMessageInTransaction(message, null);} catch (MQClientException e) {e.printStackTrace();}}return order;}
}

3、消息消费

        边对MessageListenerConcurrently有进行一定封装,主要是为了在消息处理时通过注解定位消息Topic和tag而自动选择对应的消息处理类进行业务处理;封装代码如下:

package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.Map;/*** 并发消息监听器*/
public class MQListenerConcurrently implements MessageListenerConcurrently {@Autowiredprivate Map<String, MQHandler> mqHandlerMap;public MQListenerConcurrently(Map<String, MQHandler> mqHandlerMap) {this.mqHandlerMap = mqHandlerMap;}@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if(CollectionUtils.isEmpty(list)){System.out.println("接受到的消息为空,不处理,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);// 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)// 获取该消息重试次数int reconsume = messageExt.getReconsumeTimes();if(reconsume ==3){//消息已经重试了3次,需做告警处理,已经相关日志return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理对应的业务逻辑String topic = messageExt.getTopic();String tags  = messageExt.getTags();System.out.println("接受到的消息主题为:" + topic + "; tag为:" + tags);MQHandler mqMsgHandler = null;//获取消息处理类中的topic和tag注解,根据topic和tag进行策略分发出来具体业务for (Map.Entry<String, MQHandler> entry : mqHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer == null) {//非消息处理类continue;}String annotationTopic = msgHandlerActualizer.topic();if (!StrUtil.equals(topic,annotationTopic)) {//非该主题处理类continue;}String[] annotationTags = msgHandlerActualizer.tags();if(StrUtil.equals(annotationTags[0],"*")){//获取该实例mqMsgHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解类中包含tag则获取该实例mqMsgHandler = entry.getValue();break;}}if (mqMsgHandler == null) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}ConsumeConcurrentlyStatus status = mqMsgHandler.handle(tags,messageExt);// 如果没有return success,consumer会重新消费该消息,直到return successreturn status;}
}

        事务消息监听器封装:

package com.example.framework.mq.config;import cn.hutool.core.util.StrUtil;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQTransactionHandler;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Arrays;
import java.util.Map;public class MQTransactionListener implements TransactionListener {@Autowiredprivate Map<String, MQTransactionHandler> mqTransactionHandlerMap;@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {MQTransactionHandler mqTransactionHandler = getListenner(message.getTopic(),message.getTags());return mqTransactionHandler.executeLocalTransaction(message,o);}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {MQTransactionHandler mqTransactionHandler = getListenner(messageExt.getTopic(),messageExt.getTags());return mqTransactionHandler.checkLocalTransaction(messageExt);}private MQTransactionHandler getListenner(String topic,String tags) {MQTransactionHandler mqTransactionHandler = null;for (Map.Entry<String, MQTransactionHandler> entry : mqTransactionHandlerMap.entrySet()) {MQHandlerActualizer msgHandlerActualizer = entry.getValue().getClass().getAnnotation(MQHandlerActualizer.class);if (msgHandlerActualizer != null) {String annotationTopic  = msgHandlerActualizer.topic();String[] annotationTags = msgHandlerActualizer.tags();if (!StrUtil.equals(topic,annotationTopic)) {//非该主题处理类continue;}if(StrUtil.equals(annotationTags[0],"*")){//获取该实例mqTransactionHandler = entry.getValue();break;}boolean isContains = Arrays.asList(annotationTags).contains(tags);if(isContains){//注解类中包含tag则获取该实例mqTransactionHandler = entry.getValue();break;}}}return mqTransactionHandler;}
}

        使用注解@MQHandlerActualizer标明该消息处理类的主题,默认监听所有tag,如果需要对tag监听进行分类,后面加上tag即可。消息监听器在收到消息后会自动调用主题对应的处理类进行业务处理,示例如下:

package com.example.points.mqHandler;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.example.framework.mq.annotation.MQHandlerActualizer;
import com.example.framework.mq.handler.MQHandler;
import com.example.framework.utils.SonwflakeUtils;
import com.example.points.entity.Points;
import com.example.points.service.PointsService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/***/
@MQHandlerActualizer(topic = "points")
public class PointsMQHandler implements MQHandler {@Autowiredprivate PointsService pointsService;@Overridepublic ConsumeConcurrentlyStatus handle(String tag, MessageExt messageExt) {//消息监听String messageStr = new String(messageExt.getBody());Map orderMap = (Map) JSON.parse(messageStr);Points points = new Points();Long orderId = (Long) orderMap.get("orderId");System.out.println("消息tag为:" + tag);System.out.println("消息监听:"  + "为订单" + orderId + "添加积分");//查询该订单是否已经生成对应积分(rocketMQ可能会重复发送消息,需实现幂等)QueryWrapper<Points> pointsQueryWrapper = new QueryWrapper<>();pointsQueryWrapper.lambda().eq(Points::getOrderId,orderId);Points tempPoints = pointsService.getOne(pointsQueryWrapper);if (tempPoints != null) {//该订单已经生成积分System.out.println(orderId + "已经生成积分");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}points.setPointsId(SonwflakeUtils.get().id());points.setOrderId(orderId);Integer orderAmout = (Integer) orderMap.get("orderAmout");points.setPoints(orderAmout * 10);pointsService.save(points);return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

使用消息队列注意事项

  1. RocketMQ确保所有消息至少传递一次。虽然大多数情况下,消息不会重复,但还是需要对重复消息做。
  2. 尽量减小消息的体积,例如选择轻量的协议,超过一定体积做压缩处理,就消息协议而言, 二进制协议 < 文本协议。而文本协议中 json < xml 等等。

http://www.ppmy.cn/embedded/42197.html

相关文章

【MySQL精通之路】NDB存储引擎-与MEMORY存储引擎差异

希望部署使用MEMORY存储引擎存储重要、高可用或频繁更新数据的应用程序的开发人员应考虑NDB Cluster是否是更好的选择。 MEMORY引擎的典型用例包括以下特点&#xff1a; 涉及瞬态非关键数据的操作&#xff0c;如会话管理或缓存。当MySQL服务器停止或重新启动时&#xff0c;MEMO…

hcia datacom学习(8):静态NAT、动态NAT、NAPT、Easy IP、NAT server

1.私网地址 在现实环境中&#xff0c;企业、家庭使用的网络是私网地址&#xff08;内网&#xff09;&#xff0c;运营商维护的网络则是公网地址&#xff08;外网&#xff09;。私网地址是在局域网&#xff08;LAN&#xff09;内使用的&#xff0c;因此无法被路由&#xff0c;不…

世界电信日 | 紫光展锐以科技创新支撑数字经济可持续发展

专注科技创新&#xff0c;打造全球数字经济技术基石 紫光展锐坚持科技创新,为数字经济蓬勃发展提供基石力量。 面对5G-A技术的巨大潜力&#xff0c;紫光展锐与众多生态伙伴紧密合作&#xff0c;积极推动5G-A的商用进程。紫光展锐提出的两项R18 eRedCap演进方案已被3GPP标准采…

【Shell脚本】文本三剑客之sed编辑器

目录 一.sed编辑器的相关介绍及执行过程 1.sed介绍 2.sed编辑器的执行过程 二.sed命令格式 1.基本格式 2.在一个脚本文件里定义操作命令 3.常用操作 三.打印功能 1.默认打印方式 2.防止出现重复打印 2.1. 2.2. 2.3. 2.4. 3.使用地址打印 3.1.以数字形式打印行区…

cuda 内核启动

C 使用 __global__ 声明说明符定义内核&#xff0c;并使用新的 <<<...>>> 执行配置语法指定内核调用的 CUDA 线程数&#xff08;请参阅 C 语言扩展&#xff09;。 每个执行内核的线程都有一个唯一的线程 ID&#xff0c;可以通过内置变量在内核中访问。 示例…

教程:在 Apifox 中将消息通知集成到钉钉、飞书等应用

Apifox 支持将「消息通知」集成到第三方应用平台&#xff0c;包括企业微信、钉钉、飞书、Webhook 和 Jenkins。具体可在项目的【项目设置 -> 通知设置 -> 外部通知】里新建一个通知事件&#xff0c;然后在弹出的界面中配置即可。 在配置界面可以选择需要的触发事件&#…

SSH反向代理是什麼?有什麼用?

SSH反向代理&#xff0c;也被稱為SSH隧道&#xff0c;是一種利用SSH協議的端口轉發功能&#xff0c;將網路數據通過加密的SSH連接進行傳輸的技術。它的工作原理是&#xff0c;通過SSH連接將本地的一個端口與遠程伺服器的一個端口進行綁定&#xff0c;所有發往本地端口的數據都會…

E5063A是德科技e5063a网络分析仪

181-2461-8938产品概述&#xff1a; 简  述&#xff1a; E5063A 是低成本网络分析仪&#xff0c;可提供优化的性能和功能&#xff0c;适用于测试简单的无源器件&#xff0c;例如天线、电缆、滤波器和 PCB 等。它利用工业标准 ENA 系列始终如一的测量架构&#xff0c;能够极…