前言:
RocketMQ阿里开源的,一款分布式的消息中间件,它经过阿里的生产环境的高并发、高吞吐的考验,同时,还支持分布式事务等场景。RocketMQ使用Java语言进行开发,方便Java开发者学习源码。但是,RocketMQ设计相对复杂,官方文档不是很完善,不太适合中小公司引用。技多不压身,作为一个好的Coder,应该多学习一下优秀的框架。本篇主要介绍一下,RocketMQ的基础用法:
正文:
这里我主要通过代码来介绍一下RocketMQ的使用,首先介绍一下RocketMQ的原生写法,然后介绍基于Springboot体系下,RocketMQ生产者的消息发送、消费者的消息接受等写法。
一、Java原生的消息发送与接收写法:
1. 生产者:
/*** 〈一句话功能简述〉<br>* 〈〉** @author hanxinghua* @create 2022/9/30* @since 1.0.0*/
@Slf4j
public class Producer {public static void main(String[] args) throws Exception {// 实例化消息生产者DefaultMQProducer producer = new DefaultMQProducer(RocketConstant.ORIGINAL_PRODUCER_GROUP);// 设置NameServer的地址producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 启动Producer实例producer.start();// 创建消息,设置 Topic、Tag、keys、flag、消息体等Message message = new Message(RocketConstant.ORIGINAL_TOPIC, ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息到一个BrokerSendResult sendResult = producer.send(message);// 通过sendResult返回消息是否成功送达log.info("消息发送成功:{}", sendResult);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}}
2. 消费者:
/*** 〈一句话功能简述〉<br>* 〈〉** @author hanxinghua* @create 2022/9/30* @since 1.0.0*/
@Slf4j
public class Consumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ORIGINAL_CONSUMER_GROUP);// 设置NameServer的地址consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 订阅一个或多个Topic,用Tag来过滤需要消费的消息,这里指定*表示接收所有Tag的消息consumer.subscribe(RocketConstant.ORIGINAL_TOPIC, "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {log.info("{}收到消息:{}", this.getClass().getSimpleName(), messageExts);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();log.info("消费者启动成功!");}
}
二、基于Springboot体系的消息发送写法:
1.普通消息发送:
具体有三种形式,主要包括:同步、异步、单向。
/*** 发送普通同步消息** @return*/@GetMapping("/sendMqBySync")@ResponseBodypublic Object sendMqBySync() {RocketMessage message = RocketMessage.builder().name("普通同步消息" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);return sendResult;}/*** 发送异步消息** @return*/@GetMapping("/sendMqByAsync")@ResponseBodypublic Object sendMqByAsync() {RocketMessage message = RocketMessage.builder().name("异步消息" + LocalDateTime.now()).build();// asyncSendrocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理相应的业务log.info("发送成功:{}", JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {// 处理相应的业务log.info("发送异常:{}", throwable);}});return null;}/*** 发送单向消息* <p>* 这种方式主要用在不特别关心发送结果的场景,例如日志发送** @return*/@GetMapping("/sendMqByOneWay")@ResponseBodypublic Object sendMqByOneWay() {RocketMessage message = RocketMessage.builder().name("单向消息" + LocalDateTime.now()).build();// sendOneWayrocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);return null;}
2.顺序消息发送:
具体有两种形式,主要包括:普通顺序、严格顺序。
/*** 发送普通顺序消息** @return*/@GetMapping("/sendMqByOrder")@ResponseBodypublic Object sendMqByOrder() {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("普通顺序消息" + LocalDateTime.now() + i).build();// syncSendOrderlySendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, "hashkey");results.add(sendResult);}return results;}/*** 发送严格顺序消息* <p>* 概念:* 顺序消息是一种对消息发送和消费顺序有严格要求的消息* <p>* 生产顺序性:* RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:* 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。* 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。** @return*/@GetMapping("/sendMqByStrictOrder")@ResponseBodypublic Object sendMqByStrictOrder() {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("严格顺序消息" + LocalDateTime.now() + i).build();// syncSendOrderlySendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, "hashkey");results.add(sendResult);}return results;}
3. 延迟消息发送:
/*** 发送延时消息* <p>* 延时消息的使用限制* private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"* 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18,* 消息消费失败会进入延时消息队列** @return*/@GetMapping("/sendMqByDelay")@ResponseBodypublic Object sendMqByDelay() {RocketMessage message = RocketMessage.builder().name("延时消息" + LocalDateTime.now()).build();// syncSend(... int delayLevel)SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);return sendResult;}
4. 批量消息发送:
/*** 批量发送消息* <p>* 在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批以后进行发送,* 可以增加吞吐率,并减少API和网络调用次数** @return*/@GetMapping("/sendMqByBatch")@ResponseBodypublic Object sendMqByBatch() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {RocketMessage message = RocketMessage.builder().name("批量消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);return sendResult;}
5. 事务消息发送:
/*** 发送事务消息(半消息)* <p>* 仅仅只是保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,* 并无法保证消费者一定能消费成功* <p>** @return*/@GetMapping("/sendMqByTx")@ResponseBodypublic Object sendMqByTx(Integer type, Integer msgKey) {String transactionId = UUID.randomUUID().toString();No6LocalTransactionOriginalSyntaxListener transactionListener = new No6LocalTransactionOriginalSyntaxListener();TransactionMQProducer producer = new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);try {producer.setTransactionListener(transactionListener);producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);producer.start();log.info("transactionId is {}", transactionId);org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,("事务消息" + LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);msg.getProperties().put("type", String.valueOf(type));msg.getProperties().put("msgKey", String.valueOf(msgKey));SendResult sendResult = producer.sendMessageInTransaction(msg, null);return sendResult;} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}return null;}
6. 带Tag消息发送:
/*** 发送带Tag消息* <p>* Tag(标签)可以看作子主题,它是消息的第二级类型* 通过 RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可** @return*/@GetMapping("/sendMqWithTag")@ResponseBodypublic Object sendMqWithTag() {RocketMessage message = RocketMessage.builder().name("tag消息" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);return sendResult;}
三、基于Springboot体系的消息接收写法:
1. MessageModel(消息模型):
消息模型有两种,主要包括:集群消费与广播消费
/*** 发送集群或广播消息* <p>* 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。** @param MessageModel 0:CLUSTERING 1:BROADCASTING* @return*/@GetMapping("/sendMqByMessageModel")@ResponseBodypublic Object sendMqByMessageModel(@RequestParam Integer MessageModel) {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {if (MessageModel == 0) {RocketMessage message = RocketMessage.builder().name("消息模型-集群消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message = RocketMessage.builder().name("消息模型-广播消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);results.add(sendResult);}}return results;}
2. ConsumeMode(消费模型):
消费模型有两种,主要包括:并发消费与顺序消费
/*** 并发消费与顺序消费** @param consumeMode 0:CONCURRENTLY 1:ORDERLY* @return*/@GetMapping("/sendMqByConsumeMode")@ResponseBodypublic Object sendMqByConsumeMode(Integer consumeMode) {List<SendResult> results = new ArrayList<>();for (int i = 0; i < 10; i++) {if (consumeMode == 0) {RocketMessage message = RocketMessage.builder().name("消费模型-并发消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message = RocketMessage.builder().name("消费模型-顺序消费" + LocalDateTime.now() + i).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);results.add(sendResult);}}return results;}
3. 消息过滤:
消息过滤有两种方式,主要包括:Tag过滤与SQL92过滤
/*** Tag过滤与SQL92过滤* <p>* Tag过滤:* 消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。* SQL92过滤:* 发送者设置Tag或自定义消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。* 开启对SQL语法的支持(broker.conf):* enablePropertyFilter = true** @param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性* @return*/@GetMapping("/sendMqByFilterMode")@ResponseBodypublic Object sendMqByFilterMode(Integer filterMode) {if (filterMode == 0) {RocketMessage message = RocketMessage.builder().name("消费过滤-Tag过滤" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);return sendResult;} else if (filterMode == 1) {RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤Tag" + LocalDateTime.now()).build();SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC + ":" + RocketConstant.SQL92_TAG_EXPRESSION, message);return sendResult;} else {// Message msg = new Message("topic", "tagA", "Hello MQ".getBytes())// 设置自定义属性A,属性值为1。-> msg.putUserProperties("a", "1")// RocketMQTemplate 目前好像不支持这种写法RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤自定义消息属性" + LocalDateTime.now()).build();Map<String, Object> map = new HashMap<>();map.put("a", 1);MessageHeaders messageHeaders = new MessageHeaders(map);SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));return sendResult;}}
4. 消息重试与死信队列:
/*** 消息重试与死信队列* <p>* 1. 消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,* 失败消息不再重试,继续消费新的消息。* 2. 一条消息初次消费失败后,会自动进行消息重试,达到最大重试次数后,将其发送到该消费者对应的死信队列,* 这类消息称为死信消息(Dead-Letter Message)。死信队列是死信Topic下,分区数唯一的单独队列。* 3. 如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,* 死信队列的消息将不会再被消费。* 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。** @return*/@GetMapping("/sendMqByRetry")@ResponseBodypublic Object sendMqByRetry() {RocketMessage message = RocketMessage.builder().name("消息重试" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);return sendResult;}
5. 消息应答:
/*** 消息应答** @return*/@GetMapping("/sendByReply")@ResponseBodypublic Object sendByReply() {RocketMessage message = RocketMessage.builder().name("消息应答" + LocalDateTime.now()).build();RocketMessage receiveMessage = rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);return receiveMessage;}
6. Pull消费:
Pull消费包括两种方式,主要包括:原始Pull Consumer与Lite Pull Consumer)
/*** 原始Pull Consumer的消息发送** @return*/@GetMapping("/sendByOriginalPull")@ResponseBodypublic Object sendByOriginalPull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("原始pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);return sendResult;}@GetMapping("/pullByOriginal")@ResponseBodypublic void pullByOriginal() {pullMgsOriginal.pull(0, 2);}/*** 使用rocketMQTemplate拉取消息** @return*/@GetMapping("/sendByTemplatePull")@ResponseBodypublic Object sendByTemplatePull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("template pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);return sendResult;}/*** LitePullSubscribe** @return*/@GetMapping("/sendBySubscribePull")@ResponseBodypublic Object sendBySubscribePull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("subscribe pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);return sendResult;}@GetMapping("/pullBySubscribe")@ResponseBodypublic void pullBySubscribe() {litePullSubscribeMsg.pull(20);}/*** LitePullAssign** @return*/@GetMapping("/sendByAssignPull")@ResponseBodypublic Object sendByAssignPull() {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100; i++) {RocketMessage message = RocketMessage.builder().name("assign pull消息" + LocalDateTime.now() + i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);return sendResult;}@GetMapping("/pullByAssign")@ResponseBodypublic void pullByAssign() {litePullAssignMsg.pull();}
7. 设置消费点位:
/*** 消费点类型设置成:* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP** @return*/@GetMapping("/sendMqByConsumePoint")@ResponseBodypublic Object sendMqByConsumePoint() {RocketMessage message = RocketMessage.builder().name("设置消费点位" + LocalDateTime.now()).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);return sendResult;}
8. 消费者手动应答:
/*** @param mgs 0:会抛空指针,重试三次。* @return*/@GetMapping("/sendMqByManualConfirm")@ResponseBodypublic Object sendMqByManualConfirm(Integer mgs) {RocketMessage message = RocketMessage.builder().name(mgs == null ? "消费者手动应答" + LocalDateTime.now() : String.valueOf(mgs)).build();// syncSendSendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);return sendResult;}