第一部分:基础配置与简单示例
1. 项目初始化
使用Spring Boot创建一个项目,添加RocketMQ依赖。
-
POM依赖(Maven):
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>3.2.3</version> </dependency> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version> </dependency>
-
application.yml 配置:
rocketmq:name-server: localhost:9876producer:group: default-producer-groupconsumer:group: default-consumer-group
2. 简单生产者与消费者
-
生产者:
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;@RestController public class SimpleProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String sendMessage() {rocketMQTemplate.convertAndSend("SimpleTopic", "Hello, RocketMQ with Spring Boot!");return "Message sent!";} }
-
消费者:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service;@Service @RocketMQMessageListener(topic = "SimpleTopic", consumerGroup = "simple-consumer-group") public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);} }
-
启动类:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);} }
启动项目后,访问 http://localhost:8080/send
,消费者会打印消息。这是最基础的用法,面试中常被问到如何快速集成。
第二部分:真实项目应用场景
以下是RocketMQ在Spring Boot中的典型应用场景,涵盖面试常见问题。
1. 电商订单系统(异步消息)
场景:用户下单后,异步通知库存扣减和物流系统。
-
生产者(订单服务):
@RestController public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/place-order")public String placeOrder() {String orderJson = "{\"orderId\":\"12345\",\"item\":\"Laptop\",\"quantity\":1}";// 异步发送消息rocketMQTemplate.asyncSend("OrderTopic", orderJson, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("Order sent successfully: " + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {System.err.println("Order send failed: " + throwable.getMessage());}});return "Order placed!";} }
-
消费者(库存服务):
@Service @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "inventory-consumer-group", selectorExpression = "Inventory") public class InventoryConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String orderJson) {System.out.println("Processing inventory for: " + orderJson);// 假设这里调用库存扣减逻辑} }
-
配置Tag(application.yml):
rocketmq:name-server: localhost:9876producer:group: order-producer-group
面试问题:如何确保消息不丢失?
- 回答:使用异步发送时,结合
SendCallback
检查发送结果;在生产者端开启retries
(默认3次重试);Broker端开启持久化。
2. 事务消息(支付系统)
场景:用户支付后,确保订单状态更新和消息发送一致。
-
生产者(事务消息):
@RestController public class PaymentController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderService orderService;@GetMapping("/pay")public String payOrder() {String orderId = "12345";rocketMQTemplate.sendMessageInTransaction("TransactionTopic", MessageBuilder.withPayload("Payment for " + orderId).build(), orderId);return "Payment processed!";} }@Service @RocketMQTransactionListener public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String orderId = (String) arg;try {orderService.updateOrderStatus(orderId, "PAID"); // 本地事务return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String orderId = new String(msg.getPayload()).split(" ")[2];String status = orderService.getOrderStatus(orderId);return "PAID".equals(status) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;} }@Service public class OrderService {// 模拟数据库操作private Map<String, String> orderStatus = new HashMap<>();public void updateOrderStatus(String orderId, String status) {orderStatus.put(orderId, status);}public String getOrderStatus(String orderId) {return orderStatus.getOrDefault(orderId, "UNPAID");} }
面试问题:事务消息的实现原理?
- 回答:分为两阶段提交。Producer先发送Half消息,执行本地事务后提交或回滚;Broker定时检查未决事务,调用
checkLocalTransaction
确认状态。
3. 日志收集(顺序消息)
场景:收集应用日志,确保按时间顺序处理。
-
生产者:
@RestController public class LogController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/log")public String sendLog() {String log = "{\"timestamp\":\"2025-02-25 10:00:00\",\"message\":\"User login\"}";rocketMQTemplate.syncSendOrderly("LogTopic", log, "user123"); // 使用userId作为hashKey保证顺序return "Log sent!";} }
-
消费者:
@Service @RocketMQMessageListener(topic = "LogTopic", consumerGroup = "log-consumer-group", messageModel = MessageModel.CLUSTERING) public class LogConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String log) {System.out.println("Processing log: " + log);} }
面试问题:如何保证消息顺序?
- 回答:使用
syncSendOrderly
,通过hashKey(如用户ID)将消息路由到同一队列,消费者单线程消费该队列。
4. 延迟消息(促销提醒)
场景:订单未支付30分钟后发送提醒。
-
生产者:
@RestController public class ReminderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/remind")public String sendReminder() {String reminder = "Order 12345 unpaid";rocketMQTemplate.syncSend("ReminderTopic", MessageBuilder.withPayload(reminder).build(), 1000, 18); // 18代表30分钟延迟return "Reminder scheduled!";} }
-
消费者:
@Service @RocketMQMessageListener(topic = "ReminderTopic", consumerGroup = "reminder-consumer-group") public class ReminderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String reminder) {System.out.println("Sending reminder: " + reminder);} }
面试问题:延迟消息的实现机制?
- 回答:RocketMQ内置18个延迟级别(1s到2h),消息先存储到延迟队列,到期后投递到目标队列。
第三部分:优化与高可用
1. 高可用性配置
-
多NameServer:
rocketmq:name-server: localhost:9876;localhost:9877
-
多消费者实例:ConsumerGroup内多实例负载均衡。
2. 性能优化
-
批量发送:
List<Message> messages = Arrays.asList(MessageBuilder.withPayload("msg1").build(),MessageBuilder.withPayload("msg2").build() ); rocketMQTemplate.syncSend("BatchTopic", messages);
-
调整线程池:
rocketmq:consumer:pull-batch-size: 32consume-thread-max: 64
3. 异常处理
-
消费者重试:
@Service @RocketMQMessageListener(topic = "RetryTopic", consumerGroup = "retry-consumer-group") public class RetryConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {if (true) { // 模拟失败throw new RuntimeException("Processing failed");}} }
默认重试16次,可通过
maxReconsumeTimes
调整。
第四部分:面试常见问题与回答
-
RocketMQ与Kafka的区别?
- RocketMQ支持事务消息和延迟消息,Kafka不支持。
- RocketMQ拉模式和推模式都支持,Kafka主要拉模式。
- RocketMQ适合业务场景,Kafka更偏大数据处理。
-
如何处理消息重复消费?
- 在消费者端实现幂等性(如数据库唯一约束或Redis去重)。
-
如何监控RocketMQ?
- 使用RocketMQ Dashboard查看Topic、消费进度;集成Prometheus+Grafana监控性能。