RocketMQ开发实战篇

embedded/2025/3/14 8:48:26/

一、生产者开发指南

1. Java API使用详解

在使用RocketMQ进行消息生产时,首先需要引入相关的依赖。在Maven项目中,可以在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.0</version>
</dependency>

接下来,创建一个简单的生产者示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class ProducerExample {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动生产者producer.start();// 创建消息Message msg = new Message("TopicTest", // topic"TagA", // tag"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET) // message body);// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}
}

2. 消息发送流程与代码示例

RocketMQ支持多种消息发送方式,包括同步发送、异步发送、单向发送等。

同步发送

同步发送方式在消息写入Broker后才返回结果,这种方式能够确保消息被可靠地发送到服务器端。

public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("SyncSendResult: %s%n", sendResult);}producer.shutdown();}
}

异步发送

异步发送方式虽然性能更高,但为了防止消息丢失,系统允许用户设置回调函数,在发送失败时进行重试。

public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {final int index = i;Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("AsyncSendSuccess: %s%n", sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("AsyncSendFailed: %s%n", index);}});}Thread.sleep(10000);producer.shutdown();}
}

单向发送

单向发送方式在发送消息后不等待服务器的响应,适用于对延迟要求不高的场景。

public class OnewayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.sendOneway(msg);}producer.shutdown();}
}

3. 高级特性(如批量发送、延迟消息)

批量发送

批量发送可以减少网络传输的次数,提高发送效率。

public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();MessageBatch messageBatch = new MessageBatch("TopicTest");for (int i = 0; i < 10; i++) {messageBatch.addMessage(new Message("TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)));}SendResult sendResult = producer.send(messageBatch);System.out.printf("BatchSendResult: %s%n", sendResult);producer.shutdown();}
}

延迟消息

延迟消息可以在指定的时间后被消费。

public class DelayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello Delay RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置延迟级别,1表示10s后投递,2表示30s后投递,3表示1min后投递,以此类推msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("DelaySendResult: %s%n", sendResult);producer.shutdown();}
}

二、消费者开发指南

1. 消费模式与API介绍

RocketMQ提供了两种消费模式:拉取模式和推送模式。

拉取模式

在拉取模式下,消费者主动从Broker拉取消息。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class PullConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.start();while (true) {MessageExt messageExt = consumer.pullBlocking(null, "TopicTest", "*", 1);if (messageExt != null) {System.out.printf("ReceiveNewMessages: %s%n", messageExt.getBody());}}}
}

推送模式

在推送模式下,Broker将消息推送给消费者。

public class PushConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.printf("ReceiveNewMessage: %s%n", msg.getBody());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

2. 消息拉取与处理策略

在消息拉取与处理过程中,需要注意以下几点:

  • 批量拉取:通过设置maxMsgs参数,可以一次拉取多条消息,提高拉取效率。
  • 消息过滤:通过设置subExpression参数,可以根据主题和标签筛选消息。
  • 消费确认:在消费完成后,需要向Broker发送消费确认,确保消息被可靠地处理。

3. 重试机制与死信队列

在消息消费过程中,可能会出现消费失败的情况。RocketMQ提供了重试机制和死信队列来处理这种情况。

重试机制

在推送模式下,如果消费失败,Broker会自动将消息重新推送给消费者。

public class RetryConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

死信队列

如果消息在多次重试后仍然消费失败,可以将其发送到死信队列,以便后续处理。

public class DeadLetterQueueExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());// 发送到死信队列Message deadLetterMsg = new Message("DLQ_TopicTest", "DLQ_TagA", msg.getBody());DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();producer.send(deadLetterMsg);producer.shutdown();return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

三、Spring Boot集成

1. Starter使用与配置

在Spring Boot项目中,可以使用rocketmq-spring-boot-starter来简化RocketMQ的集成。

首先,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

然后,在application.yml文件中进行配置:

rocketmq:name-server: localhost:9876producer:group: ProducerGroupconsumer:group: ConsumerGroup

2. 注解驱动开发

通过使用注解,可以简化消息生产和消费的代码。

生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String message) {rocketMQTemplate.convertAndSend("TopicTest", message);}
}

消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "ConsumerGroup")
public class RocketMQConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("ReceiveNewMessage: %s%n", message);}
}

3. 与微服务架构融合

在微服务架构中,可以将RocketMQ作为服务间通信的中间件,实现异步通信和解耦。

服务提供者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/order")public String createOrder(@RequestBody Order order) {rocketMQTemplate.convertAndSend("OrderTopic", order);return "Order created successfully";}
}

服务消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "OrderConsumerGroup")
public class OrderConsumerService implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {System.out.printf("ReceiveNewOrder: %s%n", order);// 处理订单逻辑}
}

通过以上方式,可以将RocketMQ与Spring Boot微服务架构进行深度整合,实现高效、可靠的消息传递和处理。

以上内容结合了理论与实践,提供了完整的代码示例,帮助大家全面掌握RocketMQ的开发与应用。在实际项目中,可以根据具体需求对代码进行调整和优化。


http://www.ppmy.cn/embedded/172451.html

相关文章

记录一下返修

1.对复杂度的分析还不够&#xff1b; 2.融合两种指标的解释还不够&#xff0c;审稿人认为这两种指标存在冲突&#xff0c;不能同时优化&#xff0c;但其实我们考虑的是公平性保证整个调度周期内用户分配到了更加平均的sum-rate,而se是为了追求每个调度时刻都尽可能找到信道条件…

条款1:理解模版性别推导

目录 问题引出 情况1&#xff1a;ParamType是个指针或引用&#xff0c;但不是个万能引用。 情况2&#xff1a;ParamType是个万能引用 情况3&#xff1a;ParamType既非指针也非引用 问题引出 函数模板大致形如&#xff1a; template<typename T> void f(ParamType p…

EngineerCMS完整版支持OnlyOffice8.2文档协作

这次从OO5.3那个时代的接口&#xff0c;改到支持8.2接口&#xff0c;颇费周折。centos升级和docker升级 - Powered by MinDoc (itdos.net) 1. 首先是升级centos 手动升级centos7内核&#xff08;版本自行选择&#xff0c;亲测内核下载链接有效&#xff09;_centos内核下载-CS…

微信小程序防止弹框下面穿透滚动

‌使用catchtouchmove属性‌&#xff1a;在需要防止穿透滚动的元素上添加catchtouchmove"true"属性。这样&#xff0c;当用户在该元素上进行滚动操作时&#xff0c;不会触发下层的滚动事件&#xff0c;从而防止穿透滚动。 例如&#xff1a; 修改前&#xff0c;在弹框…

JAVA面试_进阶部分_Java JVM:垃圾回收(GC 在什么时候,对什么东西,做了什么事情)

在什么时候&#xff1a; 首先需要知道&#xff0c;GC又分为minor GC 和 Full GC&#xff08;major GC&#xff09;。Java堆内存分为新生代和老年代&#xff0c;新生代 中又分为1个eden区和两个Survior区域。 一般情况下&#xff0c;新创建的对象都会被分配到eden区&#xff…

【算法day9】字符串转换整数 (atoi);请你来实现一个 myAtoi(string s) 函数,使其能将字符串转换成一个 32 位有符号整数。

字符串转换整数 (atoi) https://leetcode.cn/problems/string-to-integer-atoi/description/ 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数。 函数 myAtoi(string s) 的算法如下&#xff1a; 空格&#xff1a;读入字符串并丢…

FiddlerScript学习笔记

参考官方文档&#xff1a;https://www.fiddlerbook.com/fiddler/dev/scriptsamples.asp json // 反序列化 static function jsonDecode(str : String){return Fiddler.WebFormats.JSON.JsonDecode(str).JSONObject; } // 序列化 static function jsonEncode(jsonObject : Obje…

Doris 数据划分:分区与分桶策略全解析

在 Doris 的分布式架构里&#xff0c;数据划分策略是实现高效存储和查询的关键所在。它主要依靠分区&#xff08;Partition&#xff09;和分桶&#xff08;Bucket&#xff09;这两层逻辑划分&#xff0c;对数据的分布进行精细化管理。本文将深入探讨这两种策略的设计思路、实际…