Spring Boot集成RocketMQ:真实项目应用场景

ops/2025/2/26 9:50:04/

第一部分:基础配置与简单示例

1. 项目初始化

使用Spring Boot创建一个项目,添加RocketMQ依赖。

  • POM依赖(Maven):

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>3.2.3</version>
    </dependency>
    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>
    </dependency>
    
  • application.yml 配置:

    rocketmq:name-server: localhost:9876producer:group: default-producer-groupconsumer:group: default-consumer-group
    
2. 简单生产者与消费者
  • 生产者

    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;@RestController
    public class SimpleProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String sendMessage() {rocketMQTemplate.convertAndSend("SimpleTopic", "Hello, RocketMQ with Spring Boot!");return "Message sent!";}
    }
    
  • 消费者

    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;@Service
    @RocketMQMessageListener(topic = "SimpleTopic", consumerGroup = "simple-consumer-group")
    public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);}
    }
    
  • 启动类

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
    public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
    }
    

启动项目后,访问 http://localhost:8080/send,消费者会打印消息。这是最基础的用法,面试中常被问到如何快速集成。


第二部分:真实项目应用场景

以下是RocketMQ在Spring Boot中的典型应用场景,涵盖面试常见问题。

1. 电商订单系统(异步消息)

场景:用户下单后,异步通知库存扣减和物流系统。

  • 生产者(订单服务):

    @RestController
    public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/place-order")public String placeOrder() {String orderJson = "{\"orderId\":\"12345\",\"item\":\"Laptop\",\"quantity\":1}";// 异步发送消息rocketMQTemplate.asyncSend("OrderTopic", orderJson, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("Order sent successfully: " + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {System.err.println("Order send failed: " + throwable.getMessage());}});return "Order placed!";}
    }
    
  • 消费者(库存服务):

    @Service
    @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "inventory-consumer-group", selectorExpression = "Inventory")
    public class InventoryConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String orderJson) {System.out.println("Processing inventory for: " + orderJson);// 假设这里调用库存扣减逻辑}
    }
    
  • 配置Tag(application.yml):

    rocketmq:name-server: localhost:9876producer:group: order-producer-group
    

面试问题:如何确保消息不丢失?

  • 回答:使用异步发送时,结合 SendCallback 检查发送结果;在生产者端开启 retries(默认3次重试);Broker端开启持久化。
2. 事务消息(支付系统)

场景:用户支付后,确保订单状态更新和消息发送一致。

  • 生产者(事务消息):

    @RestController
    public class PaymentController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderService orderService;@GetMapping("/pay")public String payOrder() {String orderId = "12345";rocketMQTemplate.sendMessageInTransaction("TransactionTopic", MessageBuilder.withPayload("Payment for " + orderId).build(), orderId);return "Payment processed!";}
    }@Service
    @RocketMQTransactionListener
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String orderId = (String) arg;try {orderService.updateOrderStatus(orderId, "PAID"); // 本地事务return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String orderId = new String(msg.getPayload()).split(" ")[2];String status = orderService.getOrderStatus(orderId);return "PAID".equals(status) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}
    }@Service
    public class OrderService {// 模拟数据库操作private Map<String, String> orderStatus = new HashMap<>();public void updateOrderStatus(String orderId, String status) {orderStatus.put(orderId, status);}public String getOrderStatus(String orderId) {return orderStatus.getOrDefault(orderId, "UNPAID");}
    }
    

面试问题:事务消息的实现原理?

  • 回答:分为两阶段提交。Producer先发送Half消息,执行本地事务后提交或回滚;Broker定时检查未决事务,调用 checkLocalTransaction 确认状态。
3. 日志收集(顺序消息)

场景:收集应用日志,确保按时间顺序处理。

  • 生产者

    @RestController
    public class LogController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/log")public String sendLog() {String log = "{\"timestamp\":\"2025-02-25 10:00:00\",\"message\":\"User login\"}";rocketMQTemplate.syncSendOrderly("LogTopic", log, "user123"); // 使用userId作为hashKey保证顺序return "Log sent!";}
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "LogTopic", consumerGroup = "log-consumer-group", messageModel = MessageModel.CLUSTERING)
    public class LogConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String log) {System.out.println("Processing log: " + log);}
    }
    

面试问题:如何保证消息顺序?

  • 回答:使用 syncSendOrderly,通过hashKey(如用户ID)将消息路由到同一队列,消费者单线程消费该队列。
4. 延迟消息(促销提醒)

场景:订单未支付30分钟后发送提醒。

  • 生产者

    @RestController
    public class ReminderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/remind")public String sendReminder() {String reminder = "Order 12345 unpaid";rocketMQTemplate.syncSend("ReminderTopic", MessageBuilder.withPayload(reminder).build(), 1000, 18); // 18代表30分钟延迟return "Reminder scheduled!";}
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "ReminderTopic", consumerGroup = "reminder-consumer-group")
    public class ReminderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String reminder) {System.out.println("Sending reminder: " + reminder);}
    }
    

面试问题:延迟消息的实现机制?

  • 回答:RocketMQ内置18个延迟级别(1s到2h),消息先存储到延迟队列,到期后投递到目标队列。

第三部分:优化与高可用

1. 高可用性配置
  • 多NameServer

    rocketmq:name-server: localhost:9876;localhost:9877
    
  • 多消费者实例:ConsumerGroup内多实例负载均衡。

2. 性能优化
  • 批量发送

    List<Message> messages = Arrays.asList(MessageBuilder.withPayload("msg1").build(),MessageBuilder.withPayload("msg2").build()
    );
    rocketMQTemplate.syncSend("BatchTopic", messages);
    
  • 调整线程池

    rocketmq:consumer:pull-batch-size: 32consume-thread-max: 64
    
3. 异常处理
  • 消费者重试

    @Service
    @RocketMQMessageListener(topic = "RetryTopic", consumerGroup = "retry-consumer-group")
    public class RetryConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {if (true) { // 模拟失败throw new RuntimeException("Processing failed");}}
    }
    

    默认重试16次,可通过 maxReconsumeTimes 调整。


第四部分:面试常见问题与回答

  1. RocketMQ与Kafka的区别?

    • RocketMQ支持事务消息和延迟消息,Kafka不支持。
    • RocketMQ拉模式和推模式都支持,Kafka主要拉模式。
    • RocketMQ适合业务场景,Kafka更偏大数据处理。
  2. 如何处理消息重复消费?

    • 在消费者端实现幂等性(如数据库唯一约束或Redis去重)。
  3. 如何监控RocketMQ?

    • 使用RocketMQ Dashboard查看Topic、消费进度;集成Prometheus+Grafana监控性能。

http://www.ppmy.cn/ops/161383.html

相关文章

白帽黑客系列教程之Windows驱动开发(64位环境)入门教程(四)

为什么要写这篇文章呢&#xff1f; 作为一名白帽黑客&#xff0c;如果想要学习ROOTKIT攻防技术&#xff0c;就必须要有能力进行驱动开发&#xff01; 本文章仅提供学习&#xff0c;切勿将其用于不法手段&#xff01; 在Windows操作系统的64位环境中&#xff0c;进行ROOTKIT攻…

网络安全与措施

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 # 网络安全问题概述 1) 数据安全 访问&#xff08;授权访问&#xff09;&#xff1b;存储&#xff08;容灾、备份或异地备份等&#xff09; 2) 应用程序 不能…

2.25力扣-回溯组合总和

39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 一&#xff1a;Java class Solution {List<List<Integer>> ansnew LinkedList<>();List<Integer> tempnew LinkedList<>();int sum0;public List<List<Integer>> combinatio…

阶跃星辰的开源探索:Step-Video-T2V 与 Step-Audio 深度解析

一、引言 在人工智能&#xff08;AI&#xff09;技术迅猛发展的今天&#xff0c;多模态 AI 模型的开源成为行业发展的重要趋势。近年来&#xff0c;文本生成&#xff08;NLP&#xff09;、图像生成&#xff08;Diffusion Model&#xff09;以及语音合成&#xff08;TTS&#x…

详解:用Python OpenCV库来处理图像并测量物体的长度

1. 项目背景 智能卷尺测量系统通过传感器和算法实现自动测量&#xff0c;具备高精度、便携性和数据存储功能&#xff0c;适用于建筑、制造等领域。该系统的核心算法涉及到图像处理、计算机视觉和机器学习等技术。本文主要介绍Python OpenCV库的处理逻辑。 1. 安装所需的库 p…

akka现有的分布式定时任务框架总结

根据你的需求&#xff0c;以下是一些基于 Akka 实现的分布式定时任务框架&#xff0c;以及相关的 GitHub 项目推荐&#xff1a; 1. Openjob Openjob 是一个基于 Akka 架构的新一代分布式任务调度框架&#xff0c;支持多种定时任务、延时任务、工作流设计&#xff0c;采用无中…

【工欲善其事】2025 年实现听书自由——基于 Kokoro-82M 的开源 TTS 工具 audiblez 本地部署实战

文章目录 2025 年实现听书自由——基于 Kokoro-82M 的开源 TTS 工具 audiblez 本地部署实战1 前言2 本地部署准备工作3 具体安装步骤4 命令行启动测试5 GUI 界面启动测试6 相关资源一站式下载 2025 年实现听书自由——基于 Kokoro-82M 的开源 TTS 工具 audiblez 本地部署实战 …

Docker基础-常见命令

docker images -查看所有的本地镜像。 docker pull -把远端镜像拉取到本地。 docker rmi -删除镜像。 docker push -推到镜像仓库。 docker run -创建并运行容器&#xff08;自动化&#xff0c;如果发现镜像不存在会先去拉取&#xff0c; 拉取完了以后再去自动创建容器&am…