Idea+maven+springboot项目搭建系列--1 整合Rocketmq

news/2025/2/13 1:05:33/

前言:本文以maven+springboot 整合Rocketmq 完成消息的发送和接收。

1 Rocketmq 介绍:

1.1 Rocketmq 特性:
Apache RocketMQ是一款快速、可靠的分布式消息传递和流处理平台,具有可扩展性和高性能。它是一个分布式的、去中心化的消息队列,具有以下特性:

  • 分布式:RocketMQ允许将消息存储在多个Broker上并支持水平扩展,可以通过增加更多的Broker来扩展存储能力和吞吐量。

  • 异步传输:RocketMQ采用异步传输方式来提高性能,它的异步传输机制利用了Linux内核底层的零拷贝技术,从而实现了高吞吐量和低延迟。

  • 可靠性:RocketMQ采用了复制和故障转移机制来保证消息的可靠性。它可以配置多副本(通常是3个副本)来存储消息,当有一个Broker宕机时,系统可以自动将消息路由到其他副本上。

  • 灵活性:RocketMQ支持多种消息模式,包括点对点模式、发布/订阅模式和事务消息模式。它还支持多种消息协议,包括JMS、OpenMessaging和MQTT等。

  • 易于使用:RocketMQ使用简单,提供了丰富的客户端API和管理工具,使得开发人员可以快速地集成和使用它。

RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。

1.2 Rocketmq 主要组件:
Rocketmq 是一种基于发布-订阅(Pub/Sub)消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer)。而RocketMQ的基础消息模型就是一个简单的Pub/Sub模型。
在这里插入图片描述
RocketMQ主要由以下几个组件组成:

  • Nameserver:Nameserver是RocketMQ中的重要组件之一,它充当了命名服务和路由服务的角色。当Producer和Consumer要发送或者接收消息时,它们需要向NameServer请求获取Broker的信息,然后才能和Broker进行通信。Nameserver的作用类似于DNS服务器,用来维护RocketMQ中各个Broker的地址信息。
  • Broker:Broker是RocketMQ中的消息存储和传输核心组件。所有的消息都存储在Broker中,Producer向Broker发送消息,Consumer从Broker中订阅和接收消息。Broker的作用是接收、存储和转发消息,确保消息的可靠性和可扩展性。
  • Producer:Producer是创造和发送消息的客户端应用程序,它通过调用API将消息发送到Broker中。Producer可以按照不同的消息模式发送消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Consumer:Consumer是接收和处理消息的客户端应用程序。它通过从Broker中订阅和消费消息来实现消息的处理。Consumer可以按照不同的消息模式消费消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Message:Message是RocketMQ中最基本的消息单元,它包含了消息的内容和一些元数据,例如消息ID、消息主题、消息标签等。Producer将消息发送到Broker中,Consumer从Broker中订阅和接收消息。

Producer (生产者)和Consumer(消费者),一个向topic 发送消息,一个向topic 读取消息,消息的基本单元由Message 承接;
一般的消息组件对于消息的存储分发都只有一个组件处理,RocketMQ 中却使用了Nameserver和Broker 两个组件,那么这两个组件的关系是什么呢:
为了方便理解,这里使用图书馆进行类比:

  • 首先图书馆里存储了海量的图书,这些图书并不是杂乱无章的进行堆叠,而是按照一定的类型完成了分类存放;比如新闻类,医学类,生物类,文学类 等等,每种不同的分类下都有海量的图书;如果把每本图书看做是具体的一个个消息,那么图书的分类就是不同的topic;
  • 对于每种分类,为了统计的方便有可能需要为其在划分小类,如生物类,可以被划分为 植物类,动物类 等等,对于每个大类如果可以看做是topic ,那么大类下划分的小类就可以看做是 不同的 tag分类;
  • 显然每一种topic/tag分类的图书并不是杂乱无章的存放,而是会被整齐的放入到一排排的书架上,一排排的书架就可以看做是分区下的队列;
  • 显然书架作为了书籍最终的存放位置,那么可以将图书馆的书架看做是Broker,用户来借书和还书,最终都要来到书架上拿书和放书;
  • 显然图书馆里的书籍不仅需要分类存放,每层的图书管理人员,还需要熟悉自己负责楼层的书籍的位置信息,以及需要对书籍的维护;如果来借书的人需要的图书不在本楼层,图书管理人员也需要为其提供书籍正确的楼层位置信息,显然每层的图书管理人员,都需要掌握每层楼的图书信息,并且必要情况下,需要有可以顶替其他楼层管理员的能力;
  • 显然在rocketmq 中 ,Nameserver 的角色就和 每层的图书管理员相似;当每个用户来到本楼层还书(生产消息),楼层管理人员,需要告知还书的用户这本书,需要被正确归还的位置(消息的路由),从而帮助用户更好的还书;
  • 当用户来借书(消费消息),楼层管理人员,需要告知用户,想要的书籍正确楼层及详细位置信息(消息的路由);
  • 图书管理员怎么知道各个图书的分类以及位置信息,就需要不时的在自己的系统里动态维护数据的信息,以便于更好的服务借书的还书的人;
  • 显然rocketmq 中 最终存放数据的broker 组件需要和Nameserver 进行不时的交互,这样Nameserver 就可以实时的知晓数据的信息,当生产者投递消息时,先向Nameserver询问自己要投递的位置信息,然后在将数据进行投递到broker;当消费者消费消息时,也先向Nameserver询问自己想要消费数据的位置信息,然后在向具体的broker 获取消息;

2 springboot 整合:

2.1 引入jar:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

2.2 配置rocketmq:

# name-server地址
rocketmq.name-server=localhost:9876
# 配置消费组
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=30000
# 设置日志级别
logging.level.root=debug

2.3 生产者 消息发送工具类:

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.stream.Collectors;/*** 生产者*/
@Slf4j
@Component
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;/*** 发送普通消息** @param topic* @param tag* @param msgBody*/public void sendMsg(String topic, String tag, Object msgBody) {if (StringUtils.isNotBlank(tag)) {topic = topic.concat(":") + tag;}rocketMQTemplate.convertAndSend(topic, msgBody);}/*** 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)* sendResult为返回的发送结果*/public <T> SendResult sendMsg(String topic, T msg) {Message<T> message = MessageBuilder.withPayload(msg).build();SendResult sendResult = rocketMQTemplate.syncSend(topic, message);log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}/*** 发送异步消息** @param topic* @param tag* @param msgBody* @param callback*/public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {if (StringUtils.isNotBlank(tag)) {topic = topic.concat(":") + tag;}rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);}/*** 发送异步消息** @param topic         消息Topic* @param message       消息实体* @param sendCallback  回调函数* @param timeout       超时时间*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);}/*** 发送延时消息* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h** @param topic* @param tag* @param msgBody* @param timeout* @param delayLevel 值的有效范围1至18*/public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {if (StringUtils.isNotBlank(tag)) {topic = topic.concat(":") + tag;}if (timeout != null) {messageTimeOut = timeout.intValue();}rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);}/*** 发送异步延迟消息** @param topic        消息Topic* @param message      消息实体* @param sendCallback 回调函数* @param timeout      超时时间* @param delayLevel   延迟消息的级别*/public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}/*** 发送异步延迟消息** @param topic      消息Topic* @param message    消息实体* @param timeout    超时时间* @param delayLevel 延迟消息的级别*/public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("topic:{}消息---发送MQ成功---", topic);}@Overridepublic void onException(Throwable throwable) {log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());}}, timeout, delayLevel);}/*** 单向消息* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答* 此方式发送消息的过程耗时非常短,一般在微秒级别* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集** @param topic 消息主题* @param msg   消息体* @param <T>   消息泛型*/public <T> void sendOneWayMsg(String topic, T msg) {Message<T> message = MessageBuilder.withPayload(msg).build();rocketMQTemplate.sendOneWay(topic, message);}/*** 发送批量消息** @param topic   消息主题* @param msgList 消息体集合* @param <T>     消息泛型* @return*/public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {List<Message<T>> messageList = msgList.stream().map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());return rocketMQTemplate.syncSend(topic, messageList);}/*** 发送顺序消息** @param topic     消息主题* @param msg       消息体* @param hashKey   确定消息发送到哪个队列中* @param <T>       消息泛型*/public <T> void syncSendOrderly(String topic, T msg, String hashKey) {Message<T> message = MessageBuilder.withPayload(msg).build();log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);rocketMQTemplate.syncSendOrderly(topic, message, hashKey);}/*** 发送顺序消息** @param topic     消息主题* @param msg       消息体* @param hashKey   确定消息发送到哪个队列中* @param timeout   超时时间*/public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {Message<T> message = MessageBuilder.withPayload(msg).build();log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}}

2.4 消费者:


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",topic = "test_topic",selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String msg = new String(body);log.debug("监听到消息:message:{}", msg);}
}

2.5 测试消息发送:


import com.example.springrocket.config.RocketMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class SpringRocketApplicationTests {@Autowiredprivate RocketMQProducer rocketMQProducer;@Testvoid contextLoads() {}@Testvoid sendMQMessage(){SendResult sendResult = rocketMQProducer.sendMsg("test_topic","hello test 123");System.out.println(sendResult);}}

消息获取:
在这里插入图片描述

3 整合遇到的问题参考:

3.1 提示RocketMQTemplate bean 没有被找到:

  • 检查nameServer 和Broker 服务,是否正常启动;
  • 检查10911,10909,10912 端口是否正常暴露;
  • 检查生产者的group 分组是否配置:rocketmq.producer.group
  • 如果springboot 的版本为3.x 则可以降低2.x 的版本,因为3.x 的版本不会进行rocketmq 的自动装配;

3.2 如果提示xxx.xx.xx.xx:10911 连接失败或者决绝:

  • 检查broker 的启动配置文件broker.conf 的brokerIP1 是否为公网ip 如果不是,则需要修改为公网ip;

4 参考:

4.1 Apache RocketMQ


http://www.ppmy.cn/news/106039.html

相关文章

《深入理解计算机系统(CSAPP)》第3章 程序的机器级表示 - 学习笔记

写在前面的话&#xff1a;此系列文章为笔者学习CSAPP时的个人笔记&#xff0c;分享出来与大家学习交流&#xff0c;目录大体与《深入理解计算机系统》书本一致。因是初次预习时写的笔记&#xff0c;在复习回看时发现部分内容存在一些小问题&#xff0c;因时间紧张来不及再次整理…

相对于Vue3,Vue4都做了哪些改进

经过长时间的开发和测试&#xff0c;Vue Router 4 带来了许多改进和新功能&#xff0c;为Vue 3应用程序提供了一致性的改进。本文将介绍Vue Router 4 相对于Vue 3的改进和新特性。 1. 项目结构优化&#xff1a; Vue Router 4 进行了项目结构优化&#xff0c;将其分为三个模块…

java基于springboot自来水收费缴费系统+jsp

本次设计拟采用JAVA技术&#xff0c;对乡镇自来水收费系统的功能需求进行了全面分析&#xff0c;从模块功能定义、前后端交互技术、数据库及编程语言的选择、系统调试及测试、功能完善和改进等方面进行设计&#xff0c;解决了从用户新装、抄表、计费、收费、复查、换表、发票管…

提升你的SAP MM技能:推荐官方教材助力SAP顾问(附SAP SD最新官方教材目录)

SAP SD模块是SAP系统中负责销售与分销业务的重要模块。它涵盖了从销售流程管理到订单处理再到发货和发票等各个环节。对于SAP顾问来说&#xff0c;熟悉和掌握SAP SD模块是至关重要的&#xff0c;因为它直接关系到企业的销售业务和客户关系管理。 在学习SAP SD模块时&#xff0…

HEVC环路后处理核心介绍

介绍 为什么需要环路后处理技术 hevc采用基于快的混合编码框架&#xff0c;方块效应、振铃效应、颜色偏差、图像模糊等失真效应依旧存在&#xff0c;为了降低此类失真影响&#xff0c;需要进行环路滤波技术&#xff1b; 采用的技术 去方块滤波DF&#xff0c;为了降低块效应…

IO多路复用详解

文章目录 基本概念select系统调用详解select函数定义select的底层原理select的优缺点 poll系统调用详解poll函数定义调用poll的底层原理poll的优缺点 epoll系统调用详解epoll相关的函数定义epoll的底层原理epoll的优缺点 ET vs LT基本概念epoll_ctl模式设置 应用场景基于IO多路…

生态系统服务(InVEST模型)土壤保持、水源涵养、氮磷输出、生态保护、生物多样性、碳固

白老师&#xff08;研究员&#xff09;&#xff1a;长期从事生态系统结构-格局-过程-功能-服务的变化与响应关系等研究工作&#xff1b;重点围绕生物多样性、生态系统服务与价值等&#xff0c;构建生物地球化学模型和评价指标体系&#xff0c;为城市、区域和自然保护区的可持续…

【回顾经典AI神作】卷积神经网络CNN架构系列:LeNet,AlexNet,VGG,GoogLeNet,ResNet

卷积神经网络(CNN或ConvNet)是一种特殊的多层神经网络,旨在以最少的预处理直接从像素图像中识别视觉模式。ImageNet项目是一个大型视觉数据库,设计用于视觉对象识别软件研究。ImageNet 项目举办年度软件竞赛,即 ImageNet 大规模视觉识别挑战赛 (ILSVRC),软件程序竞相正…