一、生产者开发指南
1. Java API使用详解
在使用RocketMQ进行消息生产时,首先需要引入相关的依赖。在Maven项目中,可以在pom.xml
文件中添加以下依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.0</version>
</dependency>
接下来,创建一个简单的生产者示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class ProducerExample {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动生产者producer.start();// 创建消息Message msg = new Message("TopicTest", // topic"TagA", // tag"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET) // message body);// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}
}
2. 消息发送流程与代码示例
RocketMQ支持多种消息发送方式,包括同步发送、异步发送、单向发送等。
同步发送
同步发送方式在消息写入Broker后才返回结果,这种方式能够确保消息被可靠地发送到服务器端。
public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("SyncSendResult: %s%n", sendResult);}producer.shutdown();}
}
异步发送
异步发送方式虽然性能更高,但为了防止消息丢失,系统允许用户设置回调函数,在发送失败时进行重试。
public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {final int index = i;Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("AsyncSendSuccess: %s%n", sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("AsyncSendFailed: %s%n", index);}});}Thread.sleep(10000);producer.shutdown();}
}
单向发送
单向发送方式在发送消息后不等待服务器的响应,适用于对延迟要求不高的场景。
public class OnewayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.sendOneway(msg);}producer.shutdown();}
}
3. 高级特性(如批量发送、延迟消息)
批量发送
批量发送可以减少网络传输的次数,提高发送效率。
public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();MessageBatch messageBatch = new MessageBatch("TopicTest");for (int i = 0; i < 10; i++) {messageBatch.addMessage(new Message("TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)));}SendResult sendResult = producer.send(messageBatch);System.out.printf("BatchSendResult: %s%n", sendResult);producer.shutdown();}
}
延迟消息
延迟消息可以在指定的时间后被消费。
public class DelayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello Delay RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置延迟级别,1表示10s后投递,2表示30s后投递,3表示1min后投递,以此类推msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("DelaySendResult: %s%n", sendResult);producer.shutdown();}
}
二、消费者开发指南
1. 消费模式与API介绍
RocketMQ提供了两种消费模式:拉取模式和推送模式。
拉取模式
在拉取模式下,消费者主动从Broker拉取消息。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class PullConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.start();while (true) {MessageExt messageExt = consumer.pullBlocking(null, "TopicTest", "*", 1);if (messageExt != null) {System.out.printf("ReceiveNewMessages: %s%n", messageExt.getBody());}}}
}
推送模式
在推送模式下,Broker将消息推送给消费者。
public class PushConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.printf("ReceiveNewMessage: %s%n", msg.getBody());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
2. 消息拉取与处理策略
在消息拉取与处理过程中,需要注意以下几点:
- 批量拉取:通过设置
maxMsgs
参数,可以一次拉取多条消息,提高拉取效率。 - 消息过滤:通过设置
subExpression
参数,可以根据主题和标签筛选消息。 - 消费确认:在消费完成后,需要向Broker发送消费确认,确保消息被可靠地处理。
3. 重试机制与死信队列
在消息消费过程中,可能会出现消费失败的情况。RocketMQ提供了重试机制和死信队列来处理这种情况。
重试机制
在推送模式下,如果消费失败,Broker会自动将消息重新推送给消费者。
public class RetryConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
死信队列
如果消息在多次重试后仍然消费失败,可以将其发送到死信队列,以便后续处理。
public class DeadLetterQueueExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {try {// 模拟消费失败if (new Random().nextInt(10) > 5) {throw new Exception("Simulate consume failed");}System.out.printf("ConsumeMessageOk: %s%n", msg.getBody());} catch (Exception e) {System.out.printf("ConsumeMessageFailed: %s%n", msg.getBody());// 发送到死信队列Message deadLetterMsg = new Message("DLQ_TopicTest", "DLQ_TagA", msg.getBody());DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();producer.send(deadLetterMsg);producer.shutdown();return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
三、Spring Boot集成
1. Starter使用与配置
在Spring Boot项目中,可以使用rocketmq-spring-boot-starter
来简化RocketMQ的集成。
首先,在pom.xml
文件中添加以下依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
然后,在application.yml
文件中进行配置:
rocketmq:name-server: localhost:9876producer:group: ProducerGroupconsumer:group: ConsumerGroup
2. 注解驱动开发
通过使用注解,可以简化消息生产和消费的代码。
生产者
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String message) {rocketMQTemplate.convertAndSend("TopicTest", message);}
}
消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "ConsumerGroup")
public class RocketMQConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("ReceiveNewMessage: %s%n", message);}
}
3. 与微服务架构融合
在微服务架构中,可以将RocketMQ作为服务间通信的中间件,实现异步通信和解耦。
服务提供者
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/order")public String createOrder(@RequestBody Order order) {rocketMQTemplate.convertAndSend("OrderTopic", order);return "Order created successfully";}
}
服务消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "OrderConsumerGroup")
public class OrderConsumerService implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {System.out.printf("ReceiveNewOrder: %s%n", order);// 处理订单逻辑}
}
通过以上方式,可以将RocketMQ与Spring Boot微服务架构进行深度整合,实现高效、可靠的消息传递和处理。
以上内容结合了理论与实践,提供了完整的代码示例,帮助大家全面掌握RocketMQ的开发与应用。在实际项目中,可以根据具体需求对代码进行调整和优化。