spring boot 集成rocketmq

news/2025/1/15 5:15:48/

集成Spring Boot和RocketMQ

在现代的微服务架构中,消息队列已经成为一种常见的异步处理模式,它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景。

本文将详细介绍如何在Spring Boot项目中集成RocketMQ,实现消息的生产和消费。

开发环境

  • JDK 1.8 或更高
  • RocketMQ 4.8.0 或更高
  • Spring Boot 2.3.1.RELEASE 或更高
  • Maven 3.0 或更高

RocketMQ服务器部署

首先,我们需要在本地或服务器上部署RocketMQ。具体的部署步骤可以参考RocketMQ官方文档。为了简化部署,我们可以使用Docker进行部署。

Spring Boot项目创建

我们使用Spring Initializr创建一个新的Spring Boot项目,选择Web、Lombok和RocketMQ Spring Boot Starter为项目依赖。

pom.xml示例:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

配置RocketMQ

application.properties文件中配置RocketMQ的服务器地址和其他相关参数。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

在这里,rocketmq.name-server是RocketMQ服务器的地址,rocketmq.producer.group是生产者的组名。

消息生产者

接下来,我们创建一个消息生产者。在Spring Boot项目中,我们可以使用RocketMQTemplate来发送消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String send(String message) {rocketMQTemplate.convertAndSend("test-topic", message);return "Message: '" + message + "' sent.";}
}

上述代码中,我们创建了一个RESTful接口/send,当接口被调用时,它将发送一个消息到test-topic主题。

消息消费者

接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("------- StringConsumer received: %s \n", message);}
}

上述代码中,我们定义了一个消息消费者,它将监听test-topic主题的消息,当有新的消息时,它将打印消息内容。

测试

至此,我们已经完成了Spring Boot集成RocketMQ的所有代码。接下来,我们就可以运行Spring Boot项目,并通过访问/send接口来发送消息,查看控制台的输出来验证消息消费者是否可以正常接收消息。

这就是Spring Boot集成RocketMQ的全过程。RocketMQ作为一款功能强大的消息中间件,不仅支持基本的消息生产和消费,还支持许多高级特性,如事务消息、顺序消息、延迟消息等。在实际的项目开发中,我们可以根据业务需求选择合适的消息模型,提高系统的可用性和可靠性。

事务消息

RocketMQ支持发送事务消息,也就是说,在发送消息的同时,我们可以执行本地的数据库操作,只有当本地的数据库操作成功时,消息才会真正被发送出去。

下面是一个发送事务消息的例子:

import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.*;@RestController
public class TransactionProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendTransaction")public String sendTransaction(String message) {ExecutorService executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(5000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = rocketMQTemplate.createAndStartTransactionMQProducer("transaction-group",transactionListener,executor);producer.sendMessageInTransaction("test-topic", "TagA", message, null);return "Transaction Message: '" + message + "' sent.";}
}

在上述代码中,我们创建了一个TransactionMQProducer,并设置了一个TransactionListener来处理事务的提交和回滚。当发送事务消息时,我们需要调用sendMessageInTransaction方法。

顺序消息

RocketMQ支持发送顺序消息,也就是说,消息会按照发送的顺序被消费。

下面是一个发送顺序消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;@RestController
public class OrderlyProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendOrderly")public String sendOrderly(String message) {for (int i = 0; i < 100; i++) {rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload(message + i).build(), "hashkey");}return "Orderly Message: '" + message + "' sent.";}
}

在上述代码中,我们调用syncSendOrderly方法发送顺序消息。该方法的第三个参数是hashkey,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。

延迟消息

RocketMQ支持发送延迟消息,也就是说,消息不会立即被消费,而是会在指定的时间后被消费。

下面是一个发送延迟消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;@RestController
public class DelayProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendDelay")public String sendDelay(String message) {rocketMQTemplate.syncSend("delay_topic", MessageBuilder.withPayload(message).build(), 1000, 4);return "Delay Message: '" + message + "' sent.";}
}

在上述代码中,我们调用syncSend方法发送延迟消息。该方法的第三个参数是延迟时间,第四个参数是延迟级别。

以上就是Spring Boot集成RocketMQ的详细步骤和示例代码,希望对大家有所帮助。


http://www.ppmy.cn/news/998909.html

相关文章

Spring 容器原始 Bean 是如何创建的?

以下内容基于 Spring6.0.4。 这个话题其实非常庞大&#xff0c;我本来想从 getBean 方法讲起&#xff0c;但一想这样讲完估计很多小伙伴就懵了&#xff0c;所以我们还是一步一步来&#xff0c;今天我主要是想和小伙伴们讲讲 Spring 容器创建 Bean 最最核心的 createBeanInstan…

3D Web轻量化渲染开发工具HOOPS Communicator是什么?

HOOPS Communicator是Tech Soft 3D旗下的主流产品之一&#xff0c;具有强大的、专用的高性能图形内核&#xff0c;是一款专注于基于Web端的高级3D工程应用程序。由HOOPS Server和HOOPS Web Viewer两大部分组成&#xff0c;提供了HOOPS Convertrer、Data Authoring的模型转换和编…

【JavaWeb】知识

1.JSP&#xff1a; 1) ${pageContext.request.contextPath}的理解和用法

ubuntu samba 配置常见问题

samba配置&#xff1a; sudo vi /etc/samba/smb.conf [xxx 共享文件名] comment share folder browseable yes writable yes guest ok yes path /workdir/code/favarite create mask 0777 directory mask 0777 sudo /etc/init.d/smbd restart 重启smb服务 以上操作…

网络安全知识点整理(作业2)

目录 一、js函数声明->function 第一种 第二种 第三种 二、this关键字 this使用场合 1.全局环境 2.构造函数 3.对象的方法 避免多层this 三、js的同步与异步 定时器 setTimeout和setInterval 同步与异步的例子 四、宏任务与微任务 分辨宏任务与微任务 一、js…

Kafka3.0.0版本——Broker(总体工作流程)

目录 一、Kafka中Broker总体工作流程图解二、Kafka中Broker总体工作流程步骤解析 一、Kafka中Broker总体工作流程图解 总体工作流程图解 二、Kafka中Broker总体工作流程步骤解析 1、broker启动后在zk中注册&#xff0c;如下图所示&#xff1a; 2、controller谁先注册&…

cc2652主协处理器分时控制同一个外设的问题

问题已提交TI论坛&#xff0c;我是提交到的中文论坛&#xff0c;然后fae给转到英文论坛了。 简单描述就是&#xff0c;怎么让这个单片机一会用主处理器控制SPI设备&#xff0c;一会再用协处理器控制同一个设备。 主处理器的spi配置使用 CCS studio配置的 协处理器使用Sensor Co…

Qt应用开发(基础篇)——数值微调输入框QAbstractSpinBox、QSpinBox、QDoubleSpinBox

目录 一、前言 二、QAbstractSpinBox类 1、accelerated 2、acceptableInput 3、alignment 4、buttonSymbols 5、correctionMode 6、frame 7、keyboardTracking 8、readOnly 9、showGroupSeparator 10、specialValueText 11、text 12、wrapping 13、信号 二、Q…