RocketMQ基础学习

news/2024/11/20 23:34:20/

前言:

        RocketMQ阿里开源的,一款分布式的消息中间件,它经过阿里的生产环境的高并发、高吞吐的考验,同时,还支持分布式事务等场景。RocketMQ使用Java语言进行开发,方便Java开发者学习源码。但是,RocketMQ设计相对复杂,官方文档不是很完善,不太适合中小公司引用。技多不压身,作为一个好的Coder,应该多学习一下优秀的框架。本篇主要介绍一下,RocketMQ的基础用法:

正文:

        这里我主要通过代码来介绍一下RocketMQ的使用,首先介绍一下RocketMQ的原生写法,然后介绍基于Springboot体系下,RocketMQ生产者的消息发送、消费者的消息接受等写法。

一、Java原生的消息发送与接收写法:

1. 生产者:

/*** 〈一句话功能简述〉<br>* 〈〉** @author hanxinghua* @create 2022/9/30* @since 1.0.0*/
@Slf4j
public class Producer {public static void main(String[] args) throws Exception {// 实例化消息生产者DefaultMQProducer producer = new DefaultMQProducer(RocketConstant.ORIGINAL_PRODUCER_GROUP);// 设置NameServer的地址producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 启动Producer实例producer.start();// 创建消息,设置 Topic、Tag、keys、flag、消息体等Message message = new Message(RocketConstant.ORIGINAL_TOPIC, ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息到一个BrokerSendResult sendResult = producer.send(message);// 通过sendResult返回消息是否成功送达log.info("消息发送成功:{}", sendResult);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}}

2. 消费者:

/*** 〈一句话功能简述〉<br>* 〈〉** @author hanxinghua* @create 2022/9/30* @since 1.0.0*/
@Slf4j
public class Consumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ORIGINAL_CONSUMER_GROUP);// 设置NameServer的地址consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 订阅一个或多个Topic,用Tag来过滤需要消费的消息,这里指定*表示接收所有Tag的消息consumer.subscribe(RocketConstant.ORIGINAL_TOPIC, "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {log.info("{}收到消息:{}", this.getClass().getSimpleName(), messageExts);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();log.info("消费者启动成功!");}
}

二、基于Springboot体系的消息发送写法:

1.普通消息发送:

        具体有三种形式,主要包括:同步、异步、单向。

    /*** 发送普通同步消息** @return*/@GetMapping("/sendMqBySync")@ResponseBodypublic Object sendMqBySync() {RocketMessage message = RocketMessage.builder().name("普通同步消息" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);return sendResult;}/*** 发送异步消息** @return*/@GetMapping("/sendMqByAsync")@ResponseBodypublic Object sendMqByAsync() {RocketMessage message = RocketMessage.builder().name("异步消息" + LocalDateTime.now()).build();// asyncSendrocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理相应的业务log.info("发送成功:{}", JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {// 处理相应的业务log.info("发送异常:{}", throwable);}});return null;}/*** 发送单向消息* <p>* 这种方式主要用在不特别关心发送结果的场景,例如日志发送** @return*/@GetMapping("/sendMqByOneWay")@ResponseBodypublic Object sendMqByOneWay() {RocketMessage message = RocketMessage.builder().name("单向消息" + LocalDateTime.now()).build();// sendOneWayrocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);return null;}

2.顺序消息发送:

        具体有两种形式,主要包括:普通顺序、严格顺序。

    /*** 发送普通顺序消息** @return*/@GetMapping("/sendMqByOrder")@ResponseBodypublic Object sendMqByOrder() {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("普通顺序消息" + LocalDateTime.now() + i).build();// syncSendOrderlySendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, "hashkey");results.add(sendResult);}return results;}/*** 发送严格顺序消息* <p>* 概念:* 顺序消息是一种对消息发送和消费顺序有严格要求的消息* <p>* 生产顺序性:* RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:* 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。* 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。** @return*/@GetMapping("/sendMqByStrictOrder")@ResponseBodypublic Object sendMqByStrictOrder() {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("严格顺序消息" + LocalDateTime.now() + i).build();// syncSendOrderlySendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, "hashkey");results.add(sendResult);}return results;}

3. 延迟消息发送:

    /*** 发送延时消息* <p>* 延时消息的使用限制* private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"* 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18,* 消息消费失败会进入延时消息队列** @return*/@GetMapping("/sendMqByDelay")@ResponseBodypublic Object sendMqByDelay() {RocketMessage message = RocketMessage.builder().name("延时消息" + LocalDateTime.now()).build();// syncSend(... int delayLevel)SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);return sendResult;}

4. 批量消息发送:

    /*** 批量发送消息* <p>* 在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批以后进行发送,* 可以增加吞吐率,并减少API和网络调用次数** @return*/@GetMapping("/sendMqByBatch")@ResponseBodypublic Object sendMqByBatch() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("批量消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);return sendResult;}

 5. 事务消息发送:

   /*** 发送事务消息(半消息)* <p>* 仅仅只是保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,* 并无法保证消费者一定能消费成功* <p>** @return*/@GetMapping("/sendMqByTx")@ResponseBodypublic Object sendMqByTx(Integer type, Integer msgKey) {String transactionId = UUID.randomUUID().toString();No6LocalTransactionOriginalSyntaxListener transactionListener = new No6LocalTransactionOriginalSyntaxListener();TransactionMQProducer producer = new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);try {producer.setTransactionListener(transactionListener);producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);producer.start();log.info("transactionId is {}", transactionId);org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,("事务消息" + LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);msg.getProperties().put("type", String.valueOf(type));msg.getProperties().put("msgKey", String.valueOf(msgKey));SendResult sendResult = producer.sendMessageInTransaction(msg, null);return sendResult;} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}return null;}

6. 带Tag消息发送:

    /*** 发送带Tag消息* <p>* Tag(标签)可以看作子主题,它是消息的第二级类型* 通过 RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可** @return*/@GetMapping("/sendMqWithTag")@ResponseBodypublic Object sendMqWithTag() {RocketMessage message = RocketMessage.builder().name("tag消息" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);return sendResult;}

三、基于Springboot体系的消息接收写法:

1. MessageModel(消息模型):

        消息模型有两种,主要包括:集群消费与广播消费

    /*** 发送集群或广播消息* <p>* 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。** @param MessageModel 0:CLUSTERING   1:BROADCASTING* @return*/@GetMapping("/sendMqByMessageModel")@ResponseBodypublic Object sendMqByMessageModel(@RequestParam Integer MessageModel) {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {if (MessageModel == 0) {RocketMessage message = RocketMessage.builder().name("消息模型-集群消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message = RocketMessage.builder().name("消息模型-广播消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);results.add(sendResult);}}return results;}

2. ConsumeMode(消费模型):

        消费模型有两种,主要包括:并发消费与顺序消费

    /*** 并发消费与顺序消费** @param consumeMode 0:CONCURRENTLY  1:ORDERLY* @return*/@GetMapping("/sendMqByConsumeMode")@ResponseBodypublic Object sendMqByConsumeMode(Integer consumeMode) {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {if (consumeMode == 0) {RocketMessage message = RocketMessage.builder().name("消费模型-并发消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message = RocketMessage.builder().name("消费模型-顺序消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);results.add(sendResult);}}return results;}

3. 消息过滤:

        消息过滤有两种方式,主要包括:Tag过滤与SQL92过滤

   /*** Tag过滤与SQL92过滤* <p>* Tag过滤:* 消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。* SQL92过滤:* 发送者设置Tag或自定义消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。* 开启对SQL语法的支持(broker.conf):* enablePropertyFilter = true** @param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性* @return*/@GetMapping("/sendMqByFilterMode")@ResponseBodypublic Object sendMqByFilterMode(Integer filterMode) {if (filterMode == 0) {RocketMessage message = RocketMessage.builder().name("消费过滤-Tag过滤" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);return sendResult;} else if (filterMode == 1) {RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤Tag" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC + ":" + RocketConstant.SQL92_TAG_EXPRESSION, message);return sendResult;} else {// Message msg = new Message("topic", "tagA", "Hello MQ".getBytes())// 设置自定义属性A,属性值为1。-> msg.putUserProperties("a", "1")//  RocketMQTemplate 目前好像不支持这种写法RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤自定义消息属性" + LocalDateTime.now()).build();Map<String, Object> map = new HashMap<>();map.put("a", 1);MessageHeaders messageHeaders = new MessageHeaders(map);SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));return sendResult;}}

4. 消息重试与死信队列:

  /*** 消息重试与死信队列* <p>* 1. 消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,* 失败消息不再重试,继续消费新的消息。* 2. 一条消息初次消费失败后,会自动进行消息重试,达到最大重试次数后,将其发送到该消费者对应的死信队列,* 这类消息称为死信消息(Dead-Letter Message)。死信队列是死信Topic下,分区数唯一的单独队列。* 3. 如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,* 死信队列的消息将不会再被消费。* 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。** @return*/@GetMapping("/sendMqByRetry")@ResponseBodypublic Object sendMqByRetry() {RocketMessage message = RocketMessage.builder().name("消息重试" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);return sendResult;}

5. 消息应答:

   /*** 消息应答** @return*/@GetMapping("/sendByReply")@ResponseBodypublic Object sendByReply() {RocketMessage message = RocketMessage.builder().name("消息应答" + LocalDateTime.now()).build();RocketMessage receiveMessage = rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);return receiveMessage;}

6. Pull消费:

        Pull消费包括两种方式,主要包括:原始Pull Consumer与Lite Pull Consumer)

   /*** 原始Pull Consumer的消息发送** @return*/@GetMapping("/sendByOriginalPull")@ResponseBodypublic Object sendByOriginalPull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("原始pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);return sendResult;}@GetMapping("/pullByOriginal")@ResponseBodypublic void pullByOriginal() {pullMgsOriginal.pull(0, 2);}/*** 使用rocketMQTemplate拉取消息** @return*/@GetMapping("/sendByTemplatePull")@ResponseBodypublic Object sendByTemplatePull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("template pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);return sendResult;}/*** LitePullSubscribe** @return*/@GetMapping("/sendBySubscribePull")@ResponseBodypublic Object sendBySubscribePull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("subscribe pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);return sendResult;}@GetMapping("/pullBySubscribe")@ResponseBodypublic void pullBySubscribe() {litePullSubscribeMsg.pull(20);}/*** LitePullAssign** @return*/@GetMapping("/sendByAssignPull")@ResponseBodypublic Object sendByAssignPull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("assign pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);return sendResult;}@GetMapping("/pullByAssign")@ResponseBodypublic void pullByAssign() {litePullAssignMsg.pull();}

7. 设置消费点位:

    /*** 消费点类型设置成:* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP** @return*/@GetMapping("/sendMqByConsumePoint")@ResponseBodypublic Object sendMqByConsumePoint() {RocketMessage message = RocketMessage.builder().name("设置消费点位" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);return sendResult;}

8. 消费者手动应答:

   /*** @param mgs 0:会抛空指针,重试三次。* @return*/@GetMapping("/sendMqByManualConfirm")@ResponseBodypublic Object sendMqByManualConfirm(Integer mgs) {RocketMessage message = RocketMessage.builder().name(mgs == null ? "消费者手动应答" + LocalDateTime.now() : String.valueOf(mgs)).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);return sendResult;}


http://www.ppmy.cn/news/24113.html

相关文章

2.11知识点整理(关于pycharm,python,pytorch,conda)

pycharm 设置anaconda环境&#xff1a; File -> Settings->选择左侧的project xxx再选择打开Project Interpreter页->选择add添加解释器->添加Anaconda中Python解释器&#xff08;Anaconda安装目录下的python.exe&#xff09; (选择existing environment &#xff…

4.SpringWeb

一、创建项目LomBok:辅助开发工具&#xff0c;减少代码编写Spring Web:带上Spring MVC,可以做Web开发了Thymleaf: Web开发末班引擎&#xff08;不常用&#xff09;创建好&#xff0c;如下&#xff1a;static/ 放置静态资源的根目录templates/ 放置模板文件的根目录 二、资源配置…

Java学习记录day6

书接上回 类与对象 static关键字 static的作用&#xff1a; 修饰一个属性&#xff1a;声明为static的变量实质上就是一个全局变量,其生命周期为从类被加载开始一直到程序结束&#xff1b;修饰方法&#xff1a;无须本类的对象也可以调用该方法&#xff1b;修饰一个类&#x…

STL——list

一、list介绍及使用 1. list文档介绍 &#xff08;1&#xff09;list是可以在常数范围内&#xff0c;在任意位置进行插入、删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 &#xff08;2&#xff09;list的底层是带头结点的双向循环链表&#xff0c;其中每个元素…

JUC并发编程Ⅰ -- Java中的线程

文章目录线程与进程并行与并发进程与线程应用应用之异步调用应用之提高效率线程的创建方法一&#xff1a;通过继承Thread类创建方法二&#xff1a;使用Runnable配合Thread方法三&#xff1a;使用FutureTask与Thread结合创建查看进程和线程的方法线程运行的原理栈与栈帧线程上下…

不停服更新应用的方案:蓝绿发布、滚动发布、灰度发布

原文网址&#xff1a;不停服更新应用的方案&#xff1a;蓝绿发布、滚动发布、灰度发布_IT利刃出鞘的博客-CSDN博客 简介 本文介绍不停服更新应用的方案&#xff1a;蓝绿发布、滚动发布、灰度发布。 升级服务器的应用时&#xff0c;要停止掉老版本服务&#xff0c;将程序上传…

计算机组成与设计04——处理器

系列文章目录 本系列博客重点在深圳大学计算机系统&#xff08;3&#xff09;课程的核心内容梳理&#xff0c;参考书目《计算机组成与设计》&#xff08;有问题欢迎在评论区讨论指出&#xff0c;或直接私信联系我&#xff09;。 第一章 计算机组成与设计01——计算机概要与技…

【计算机网络】Linux环境中的TCP网络编程

文章目录前言一、TCP Socket API1. socket2. bind3. listen4. accept5. connect二、封装TCPSocket三、服务端的实现1. 封装TCP通用服务器2. 封装任务对象3. 实现转换功能的服务器四、客户端的实现1. 封装TCP通用客户端2. 实现转换功能的客户端五、结果演示六、多进程版服务器七…