【RocketMQ】RocketMQ入门

news/2024/12/5 11:46:31/

【RocketMQ】RocketMQ入门

文章目录

  • 【RocketMQ】RocketMQ入门
    • 1. 消费模式
    • 2. 发送/消费 消息
      • 2.1 同步消息
      • 2.2 异步消息
      • 2.3 单向消息
      • 2.4 延迟消息
      • 2.5 批量消息
      • 2.6 顺序消息

1. 消费模式

MQ的消费模式大致分为两种,一种是推Push,一种是拉pull。

Push模式:

  1. 优点:
    • 及时性较好
  2. 缺点:
    • 客户端没有做好流控的话容易导致客户端消息堆积甚至崩溃。

Pull模式:

  1. 优点:
    • 客户端可以根据自己的消费能力进行消费
  2. 缺点:
    • 拉取频率不好控制,频繁容易造成客户端压力过大,拉取间隔长容易造成消费不及时。

Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式


2. 发送/消费 消息

参考文档:RocketMQ官方文档

以下代码采用的都是rocketmq的原生api

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>

2.1 同步消息

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

image-20230525153823533

生产者发送消息代码如下:

@Test
public void simpleProducer() throws Exception {//创建一个生产者 (制定一个组名)DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");//连接namesrvproducer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//启动producer.start();for (int i = 1; i <= 10; i++) {//创建消息Message message = new Message("testTopic", ("我是一个简单的消息" + i).getBytes());//发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());}//关闭生产者producer.shutdown();
}

消费者消费信息代码如下:

@Test
public void simpleConsumer() throws Exception {//创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");//连接namesrvconsumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//订阅一个主题 * 标识订阅这个主题中所有消息,后期会有消息过滤consumer.subscribe("testTopic", "*");//设置一个监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {//这个就是消费的方法(业务处理)System.out.println("我是消费者");System.out.println(list.get(0).toString());System.out.println("消息内容:" + new String(list.get(0).getBody()));System.out.println("消费上下文" + consumeConcurrentlyContext);//返回值 CONSUME_SUCCESS 成功,消息会从mq出队// RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//TimeUnit.SECONDS.sleep(100);//挂起当前jvmSystem.in.read();
}

2.2 异步消息

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

image-20230525154600960

注:异步发送生产者需要实现异步发送回调接口。

生产者发送消息代码如下:

@Test
public void asyncProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");//连接producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//启动producer.start();Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败" + throwable.getMessage());}});System.out.println("我先执行");System.in.read();
}

消费者代码基本和同步消息的相同,不展示。


2.3 单向消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集

image-20230525154748027

生产者发送消息代码如下:

@Test
public void onewayProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("onewayTopic", "这是一条单向消息".getBytes());producer.sendOneway(message);System.out.println("成功");producer.shutdown();
}

2.4 延迟消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。应用场景是外卖15分钟未支付则取消订单。

RokcketMQ一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

生产者发送消息代码如下:

@Test
public void msProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("orderMsTopic", "订单消息".getBytes());//设置延迟等级//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"message.setDelayTimeLevel(3);//10sproducer.send(message);System.out.println("发送事件:" + new Date());producer.shutdown();
}

2.5 批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

image-20230525160131302

注:批量消息大小不能超过1MIB(1024*1024),同一批的 topic 必须相同

生产者发送消息代码如下:

@Test
public void testBatchProducer() throws Exception{// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 启动实例producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopic", "我是一组消息的A消息".getBytes()),new Message("batchTopic", "我是一组消息的B消息".getBytes()),new Message("batchTopic", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}

消费者消费信息代码如下:

@Test
public void msConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bathc-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("batchTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到消息了:" + new Date());System.out.println(list.size());System.out.println("消息体是:" + new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

2.6 顺序消息

待续。。。


http://www.ppmy.cn/news/83129.html

相关文章

Install Prometheus Monitoring On Kubernetes Cluster

目录 Node & Software & Docker Images Lists ​Prometheus introduction Download Kubernetes Prometheus Manifest Files Install Prometheus Monitoring Kubernetes Create a Namespace Create a Cluster Role And Binding It Create a Config Map Create…

R语言实践——rWCVP 的函数清单

rWCVP 的函数清单 1. get_area_name()用法参数值详介例子 2. get_wgsrpd3_codes()用法参数值详介例子 3. powo_map()用法参数值 4. powo_pal(), scale_color_powo(), scale_colour_powo(), scale_fill_powo()用法参数值 5. redlist_example用法格式资源 6. taxonomic_mapping用…

1139 First Contact (PAT甲级)

这道题柳婼有个很巧妙的方法&#xff0c;就是如果a和b是朋友&#xff08;a, b都是四位数字id&#xff09;&#xff0c;那就把a * 10000 b和b * 10000 a都map到1&#xff0c;那就很容易判断两个人是否朋友了。 #include <cstdio> #include <iostream> #include &…

盛元广通疾病预防控制中心检测管理信息系统

近些年&#xff0c;在疾病预防控制领域&#xff0c;公共卫生事件的发生都是通过信息化手段在日常工作中加以应用以及广泛深入的探索&#xff0c;加快疾控实验室信息化建设进程&#xff0c;可以有效把控不同类型检测任务中的每个节点&#xff0c;严防不同系统填报多次出现信息误…

YOLO V3 SPP ultralytics 第四节:YOLO V3 SPP网络的搭建

目录 1. 介绍 2. 代码介绍 2.1 create_modules 部分 2.1.1 不同层的处理 2.1.2 信息的融合 2.1.3 yolo 层的处理 2.2 get_yolo_layers 2.3 前向传播 3. 完整代码 1. 介绍 根据 上一节 解析的cfg文件&#xff0c;本章将配置文件cfg 搭建YOLO V3 SPP网络 本章的代码经过…

设计模式-简单例子理解适配器模式、装饰器模式

文章目录 一、适配器模式1. 要点2. Demo 二、装饰器模式1. 要点2. Demo 三、区别 本文参考&#xff1a; 基本原理&#xff1a;装饰器模式 | 菜鸟教程 (runoob.com) 基本原理&#xff1a;适配器模式 | 菜鸟教程 (runoob.com) 优缺点和区别&#xff0c;装饰模式&#xff1a;适配器…

Vue中的nextTick是用来做什么的,解决什么问题的-M

Vue中的nextTick是用来做什么的&#xff0c;解决什么问题的 Vue 的 nextTick 其本质是对 JavaScript 执行原理&#xff0c;时间循环机制 EventLoop 的一种应用。 $nextTick接收一个函数作为参数&#xff0c;在下一个时间片执行该函数的方法。或者说在浏览器初次加载或因为某些…

二叉排序树的查找、插入、删除

目录 二叉排序树的定义 二叉排序树的查找 二叉排序树的插入 二叉排序树的定义 二叉排序树的定义 二叉排序树&#xff08;Binary Sort Tree&#xff0c; BST&#xff09;&#xff0c;也称二叉查找树。 二叉排序树或者是一棵空树&#xff0c;或者是一棵具有下列特性的非空二叉…