RocketMQ开发实战篇

news/2025/3/15 1:07:20/

一、生产者开发指南

1. Java API使用详解

在使用RocketMQ进行消息生产时,首先需要引入相关的依赖。在Maven项目中,可以在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.0</version>
</dependency>

接下来,创建一个简单的生产者示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class ProducerExample {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动生产者producer.start();// 创建消息Message msg = new Message("TopicTest", // topic"TagA", // tag"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET) // message body);// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}
}

2. 消息发送流程与代码示例

RocketMQ支持多种消息发送方式,包括同步发送、异步发送、单向发送等。

同步发送

同步发送方式在消息写入Broker后才返回结果,这种方式能够确保消息被可靠地发送到服务器端。

public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("SyncSendResult: %s%n", sendResult);}producer.shutdown();}
}

异步发送

异步发送方式虽然性能更高,但为了防止消息丢失,系统允许用户设置回调函数,在发送失败时进行重试。

public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {final int index = i;Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("AsyncSendSuccess: %s%n", sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("AsyncSendFailed: %s%n", index);}});}Thread.sleep(10000);producer.shutdown();}
}

单向发送

单向发送方式在发送消息后不等待服务器的响应,适用于对延迟要求不高的场景。

public class OnewayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.sendOneway(msg);}producer.shutdown();}
}

3. 高级特性(如批量发送、延迟消息)

批量发送

批量发送可以减少网络传输的次数,提高发送效率。

public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();MessageBatch messageBatch = new MessageBatch("TopicTest");for (int i = 0; i < 10; i++) {messageBatch.addMessage(new Message("TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)));}SendResult sendResult = producer.send(messageBatch);System.out.printf("BatchSendResult: %s%n", sendResult);producer.shutdown();}
}

延迟消息

延迟消息可以在指定的时间后被消费。

public class DelayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello Delay RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置延迟级别,1表示10s后投递,2表示30s后投递,3表示1min后投递,以此类推msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("DelaySendResult: %s%n", sendResult);producer.shutdown();}
}

二、消费者开发指南

1. 消费模式与API介绍

RocketMQ提供了两种消费模式:拉取模式和推送模式。

拉取模式

在拉取模式下,消费者主动从Broker拉取消息。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class PullConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.start();while (true) {MessageExt messageExt = consumer.pullBlocking(null, "TopicTest", "*", 1);if (messageExt != null) {System.out.printf("ReceiveNewMessages: %s%n", messageExt.getBody());}}}
}

推送模式

在推送模式下,Broker将消息推送给消费者。

public class PushConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.printf("ReceiveNewMessage: %s%n", msg.getBody());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

2. 消息拉取与处理策略

在消息拉取与处理过程中,需要注意以下几点:

  • 批量拉取:通过设置maxMsgs参数,可以一次拉取多条消息,提高拉取效率。
  • 消息过滤:通过设置subExpression参数,可以根据主题和标签筛选消息。
  • 消费确认:在消费完成后,需要向Broker发送消费确认,确保消息被可靠地处理。

3. 重试机制与死信队列

在消息消费过程中,可能会出现消费失败的情况。RocketMQ提供了重试机制和死信队列来处理这种情况。

重试机制

在推送模式下,如果消费失败,Broker会自动将消息重新推送给消费者。

public class RetryConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

死信队列

如果消息在多次重试后仍然消费失败,可以将其发送到死信队列,以便后续处理。

public class DeadLetterQueueExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());// 发送到死信队列Message deadLetterMsg = new Message("DLQ_TopicTest", "DLQ_TagA", msg.getBody());DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();producer.send(deadLetterMsg);producer.shutdown();return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

三、Spring Boot集成

1. Starter使用与配置

在Spring Boot项目中,可以使用rocketmq-spring-boot-starter来简化RocketMQ的集成。

首先,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

然后,在application.yml文件中进行配置:

rocketmq:name-server: localhost:9876producer:group: ProducerGroupconsumer:group: ConsumerGroup

2. 注解驱动开发

通过使用注解,可以简化消息生产和消费的代码。

生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String message) {rocketMQTemplate.convertAndSend("TopicTest", message);}
}

消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "ConsumerGroup")
public class RocketMQConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("ReceiveNewMessage: %s%n", message);}
}

3. 与微服务架构融合

在微服务架构中,可以将RocketMQ作为服务间通信的中间件,实现异步通信和解耦。

服务提供者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/order")public String createOrder(@RequestBody Order order) {rocketMQTemplate.convertAndSend("OrderTopic", order);return "Order created successfully";}
}

服务消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "OrderConsumerGroup")
public class OrderConsumerService implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {System.out.printf("ReceiveNewOrder: %s%n", order);// 处理订单逻辑}
}

通过以上方式,可以将RocketMQ与Spring Boot微服务架构进行深度整合,实现高效、可靠的消息传递和处理。

以上内容结合了理论与实践,提供了完整的代码示例,帮助大家全面掌握RocketMQ的开发与应用。在实际项目中,可以根据具体需求对代码进行调整和优化。


http://www.ppmy.cn/news/1579186.html

相关文章

Redis 哨兵模式详解:实现高可用与自动故障转移

目录 引言 1. 什么是 Redis 哨兵模式&#xff1f; 1.1 定义 1.2 核心概念 2. Redis 哨兵模式的工作原理 2.1 监控 2.2 故障检测 2.3 故障转移 2.4 通知 3. Redis 哨兵模式的配置方法 3.1 配置文件 3.2 启动哨兵节点 4. Redis 哨兵模式的使用场景 4.1 高可用性 4.…

【Go语言圣经1.5】

目标 概念 要点&#xff08;案例&#xff09; 实现了一个简单的 HTTP 客户端程序&#xff0c;主要功能是&#xff1a; 读取命令行参数&#xff1a;程序从命令行获取一个或多个 URL。发送 HTTP GET 请求&#xff1a;使用 Go 内置的 net/http 包&#xff0c;通过 http.Get 函…

初一信息科技教程专用抓包软件1.4.2版本

tcp、udp、dns、https\http\ftp抓包&#xff0c;修订了SYN抓包&#xff0c;确保三次握手顺序 修订程序假死&#xff0c;原因是抓包太多&#xff0c;因此限制只抓取最多100个包。

云计算VS网络安全,应该怎么选?

运维是什么 运维&#xff0c;本质上是对网络、服务器、服务的生命周期各个阶段 的运营与维护&#xff0c;在成本、稳定性、效率上达成一致可接受的状态。 优势: 市场需求大:云计算的落地和应用加剧了对云计算运维人才的 需求薪资待遇:工作含金量较高&#xff0c;因而也拥有不…

vue3 使用docxtemplater 动态生成docx

模版文件docx放到vue工程public下 文件内容 vue文件 <template><div><button click"generateDocument">生成Word文档</button></div> </template><script> import PizZip from pizzip; import Docxtemplater from docx…

洛谷P10576 [蓝桥杯 2024 国 A] 儿童节快乐

设x^2n10120300500 , y^2n−10120300500&#xff0c;x>y>0 x^2-y^220,240,601,000 (x-y)(xy)20,240,601,000&#xff0c;枚举两个因数中较小的那个&#xff0c;也就是x-y&#xff0c;通过(x-y)和(xy)相加相减消元来解出x和y&#xff0c;但是通过消元解出的x和y不一定满…

Next.js提供api接口

看react官网在推Next.js,所以简单学习了解一下 DEMO 使用cna官方脚手架&#xff08;13版本&#xff09;初始化项目以后目录如下&#xff1a; 可以看出&#xff0c;初始项目只有一个根路由页面page.tsx,想要增加一个纯粹的api route可以在app/目录下创建api/xxx/route.ts。即可…

Qt开源控件库(qt-material-widgets)的编译及使用

项目简介 qt-material-widgets是一个基于 Qt 小部件的 Material Design 规范实现。 项目地址 项目地址&#xff1a;qt-material-widgets 本地构建环境 Win11 家庭中文版 VS2019 Qt5.15.2 (MSVC2019) 本地构建流程 克隆后的目录结构如图&#xff1a; 直接使用Qt Crea…