Springboot整合RocketMq

devtools/2024/9/23 20:28:09/

RocketMq相较于rabbitMq的优点(https://www.jianshu.com/p/56686af7eedd):
RocketMQ的优点:

  • 性能优越:RocketMQ在处理大量消息时,性能优于RabbitMQ。当面临每秒数万到数十万条消息的处理需求时,RocketMQ能够提供更好的性能。
  • 灵活的路由配置:RocketMQ在生产者和队列之间增加了一个Exchange模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中,这提供了更大的灵活性。
  • 对在线业务的响应时延做了很多优化:RocketMQ对在线业务的响应时延做了很多优化,大多数情况下可以做到毫秒级响应。
  • 中文社区活跃:对于中文用户来说,RocketMQ的中文社区比较活跃,源代码易读,方便二次开发。

RocketMQ的缺点:

  • 大量消息堆积时,会导致性能急剧下降。
  • 和其它两种消息队列产品相比,性能是最差的。因此,如果业务对性能要求特别高,就不要选用RocketMQ。
  • Java开发,虽然学习成本相对较低,但仍然需要学习相关的开发技术。

RabbitMQ的优点:

  • 高并发、高吞吐量:由于使用了Erlang语言,RabbitMQ在消息处理性能和吞吐量方面表现优秀。它可以处理大量的并发消息,并保证高吞吐量。
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全:RabbitMQ被认为是非常健壮、稳定和易用的消息队列产品。它支持多种编程语言,并且提供了丰富的文档和社区支持。
  • 开源提供的管理界面非常棒,用起来很好用:RabbitMQ提供了直观而易于使用的开源管理界面,使得管理和监控消息队列变得非常方便。
  • 社区活跃度高:RabbitMQ的社区非常活跃,有大量的用户和开发者在使用和贡献代码。这使得RabbitMQ具有很好的生态系统,便于获取支持和解决问题。

RabbitMQ的缺点:

  • Erlang开发,很难去看懂源码:对于一些开发者来说,Erlang语言可能比较陌生,学习成本较高,而且源代码可能难以理解。这可能会对二次开发和维护造成一定的困难。
  • 需要学习比较复杂的接口和协议:RabbitMQ使用了一些相对复杂的接口和协议,学习和维护成本较高。这可能会增加开发和维护的难度。
  • 相比其他消息队列产品,性能稍逊一筹:虽然RabbitMQ在性能方面表现不错,但相比一些其他消息队列产品(如RocketMQ),其性能可能稍逊一筹。如果业务对性能要求特别高,可能需要考虑其他选择。

综上所述,RocketMQ和RabbitMQ各有优缺点,需要根据具体业务需求进行选择。如果业务需要高性能、灵活的路由配置和对在线业务的响应时延有较高要求,可以考虑使用RocketMQ;如果业务需要高并发、高吞吐量、健壮稳定且易于使用和管理的消息队列产品,可以考虑使用RabbitMQ。

1.maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.0</version>
</dependency>

2.配置文件

rocketmq:name-server: xx.xx.xx.xx:9876    // 你的rocketmq服务器及端口producer:group: sms_producer_group

3.配置类
这里只是对RocketMQTemplate中的一些方法进行了封装,更方便调用,其他方法可直接查看RocketMQTemplate类

package net.trueland.smart.marketing.config;import org.springframework.context.annotation.Configuration;/*** RocketMq** @author sxd* @date 2023/9/8 10:30*/
@Configuration
public class RocketMqConfig {public final static String AI_CALL_CALLBACK_TOPIC = "tcloud_di_marketing_delay_passageway_topic";public final static String AI_CALL_CALLBACK_GROUP = "tcloud_di_marketing_delay_passageway_group";}

4.工具类

package net.trueland.smart.marketing.util;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** RocketMqUtil* @author sxd* @date 2023/9/8 10:52*/
@Component
@Slf4j
public class RocketMqUtil {@Autowiredprivate RocketMQTemplate rocketmqTemplate;/*** 同步发送* @param topic* @param content* @return*/public SendResult syncSend(String topic, String content) {SendResult sendResult = rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(content).build());log.info("RocketMqUtil-syncSend,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));return sendResult;}/*** 异步发送* @param topic* @param content* @return*/public void aSyncSend(String topic, String content) {SendCallback callback = new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("RocketMqUtil-aSyncSend-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {log.info("RocketMqUtil-aSyncSend-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage());}};rocketmqTemplate.asyncSend(topic, content, callback);}/*** 异步发送--延迟队列* @param topic 主题* @param content 内容* @param delayTimeLevel 延迟等级*/public void aSyncSendDelay(String topic, String content, Integer delayTimeLevel) {SendCallback callback = new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("RocketMqUtil-aSyncSendDelay-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {log.info("RocketMqUtil-aSyncSendDelay-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage());}};// 3000表示发送超时时间rocketmqTemplate.asyncSend(topic, MessageBuilder.withPayload(content).build(), callback, 3000L, delayTimeLevel);}}

5.生产者

// 这里以异步延迟消息为例
// 发送rocketmq。延迟等级使用14,即10分钟
rocketMqUtil.aSyncSendDelay(RocketMqConfig.AI_CALL_CALLBACK_TOPIC, JSON.toJSONString(req), 14);

6.消费者

package net.trueland.smart.marketing.mq;import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import net.trueland.smart.marketing.config.RocketMqConfig;
import net.trueland.smart.marketing.model.aicall.CreateTaskRequest;
import net.trueland.smart.marketing.util.AICallUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/**** @author sxd* @date 2023/9/8 14:44*/
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP)
public class AICallCallbackConsumer implements RocketMQListener<String> {@ResourceAICallUtils aiCallUtils;@Overridepublic void onMessage(String data) {log.info("AICallCallbackConsumer-onMessage-收到延迟队列消息:{}",  data);try{// 使用实体类解析消息(根据自己的消息来创建自己的实体类)CreateTaskRequest createTaskRequest = JSON.parseObject(data, CreateTaskRequest.class);// 业务逻辑。。。}catch (Exception e){log.error("AICallCallbackConsumer-onMessage-异常", e);}}
}

解释一下消费者中的代码
@RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP)
@RocketMQMessageListener表示这是一个rocketmq的消费者,该注解只能用在类上面 topic:指定需要消费的队列
consumerGroup:定义消费组,消费组是指处理某一类消息的消费者的集合

注意事项
1.多个监听器使用同一个group,会报错Caused by: org.apache.rocketmq.client.exception.MQClientException: The consu
解决办法:https://blog.csdn.net/LLittleF/article/details/130689449
2.可视化平台:rocketmq服务器地址:18080


http://www.ppmy.cn/devtools/86183.html

相关文章

PHP表单验证邮件和URL

在PHP中验证表单中的电子邮件地址和URL地址是确保用户输入数据正确性的重要步骤。下面是一个详细的教程&#xff0c;介绍如何使用PHP来验证电子邮件和URL地址。 一、验证电子邮件地址 电子邮件地址的验证通常涉及检查字符串是否符合电子邮件的标准格式。虽然完全通过正则表达…

一条 java web 请求的历程(一)

以一个web请求的历程&#xff0c;将java web的相关知识进行总结&#xff1b; 1、HTTP协议和TCP/IP协议的区别&#xff1f; 答&#xff1a;TCP/IP协议是传输层协议&#xff0c;主要解决数据如何在网络中传输。而HTTP是应用层协议&#xff0c;主要解决如何包装数据。Web使用HTTP协…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第六十三章 输入子系统实验

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

第十章 计算机网络——应用层

域名系统 DNS 域名系统&#xff08;DNS&#xff0c;Domain Name System&#xff09;是互联网中的一项核心服务&#xff0c;它充当了人类可读的域名与机器可识别的IP地址之间的“翻译官”。以下是DNS的详细概述&#xff1a; DNS的定义 DNS是一个分布式命名系统&#xff0c;用…

LeetCode-day24-2766. 重新放置石块

LeetCode-day24-2766. 重新放置石块 题目描述示例示例1&#xff1a;示例2&#xff1a; 思路代码 题目描述 给你一个下标从 0 开始的整数数组 nums &#xff0c;表示一些石块的初始位置。再给你两个长度 相等 下标从 0 开始的整数数组 moveFrom 和 moveTo 。 在 moveFrom.leng…

IP 泄露: 原因与避免方法

始终关注您的IP信息&#xff01; 您的IP地址不仅显示您的位置&#xff0c;它包含几乎所有的互联网活动信息&#xff01; 如果出现IP泄漏&#xff0c;几乎所有的信息都会被捕获甚至非法利用&#xff01; 那么&#xff0c;网站究竟如何追踪您的IP地址&#xff1f;您又如何有效…

简单的数据结构:栈

1.栈的基本概念 1.1栈的定义 栈是一种线性表&#xff0c;只能在一端进行数据的插入或删除&#xff0c;可以用数组或链表来实现&#xff0c;这里以数组为例进行说明 栈顶 &#xff1a;数据出入的那一端&#xff0c;通常用Top表示 栈底 :相对于栈顶的另一端&#xff0c;也是固…

git分布式版本控制系统及在码云上创建项目并pull和push

一、分支概念 每次提交&#xff0c;Git都把它们串成一条时间线&#xff0c;这条时间线就是一个分支。截止 到目前&#xff0c;只有一条时间线&#xff0c;在Git里&#xff0c;这个分支叫主分支&#xff0c;即master分支。 HEAD 严格来说不是指向提交&#xff0c;而是指向mas…