【RocketMQ】RocketMQ入门
文章目录
- 【RocketMQ】RocketMQ入门
- 1. 消费模式
- 2. 发送/消费 消息
- 2.1 同步消息
- 2.2 异步消息
- 2.3 单向消息
- 2.4 延迟消息
- 2.5 批量消息
- 2.6 顺序消息
1. 消费模式
MQ的消费模式大致分为两种,一种是推Push,一种是拉pull。
Push模式:
- 优点:
- 及时性较好
- 缺点:
- 客户端没有做好流控的话容易导致客户端消息堆积甚至崩溃。
Pull模式:
- 优点:
- 客户端可以根据自己的消费能力进行消费
- 缺点:
- 拉取频率不好控制,频繁容易造成客户端压力过大,拉取间隔长容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
2. 发送/消费 消息
参考文档:RocketMQ官方文档
以下代码采用的都是rocketmq的原生api
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
2.1 同步消息
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
生产者发送消息代码如下:
@Test
public void simpleProducer() throws Exception {//创建一个生产者 (制定一个组名)DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");//连接namesrvproducer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//启动producer.start();for (int i = 1; i <= 10; i++) {//创建消息Message message = new Message("testTopic", ("我是一个简单的消息" + i).getBytes());//发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());}//关闭生产者producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void simpleConsumer() throws Exception {//创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");//连接namesrvconsumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//订阅一个主题 * 标识订阅这个主题中所有消息,后期会有消息过滤consumer.subscribe("testTopic", "*");//设置一个监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {//这个就是消费的方法(业务处理)System.out.println("我是消费者");System.out.println(list.get(0).toString());System.out.println("消息内容:" + new String(list.get(0).getBody()));System.out.println("消费上下文" + consumeConcurrentlyContext);//返回值 CONSUME_SUCCESS 成功,消息会从mq出队// RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//TimeUnit.SECONDS.sleep(100);//挂起当前jvmSystem.in.read();
}
2.2 异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
注:异步发送生产者需要实现异步发送回调接口。
生产者发送消息代码如下:
@Test
public void asyncProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");//连接producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//启动producer.start();Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败" + throwable.getMessage());}});System.out.println("我先执行");System.in.read();
}
消费者代码基本和同步消息的相同,不展示。
2.3 单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
生产者发送消息代码如下:
@Test
public void onewayProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("onewayTopic", "这是一条单向消息".getBytes());producer.sendOneway(message);System.out.println("成功");producer.shutdown();
}
2.4 延迟消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。应用场景是外卖15分钟未支付则取消订单。
RokcketMQ一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
生产者发送消息代码如下:
@Test
public void msProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("orderMsTopic", "订单消息".getBytes());//设置延迟等级//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"message.setDelayTimeLevel(3);//10sproducer.send(message);System.out.println("发送事件:" + new Date());producer.shutdown();
}
2.5 批量消息
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
注:批量消息大小不能超过1MIB(1024*1024),同一批的 topic 必须相同
生产者发送消息代码如下:
@Test
public void testBatchProducer() throws Exception{// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 启动实例producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopic", "我是一组消息的A消息".getBytes()),new Message("batchTopic", "我是一组消息的B消息".getBytes()),new Message("batchTopic", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void msConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bathc-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("batchTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到消息了:" + new Date());System.out.println(list.size());System.out.println("消息体是:" + new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
2.6 顺序消息
待续。。。