一、基本概念
- 分组 group
- 主题 topic
- 消息队列 message queue:一般一个消息队列服务一个消费者,提高并发度。
- 标签 tag:生产消息、消费消息可以指定tag,实现消息消费的过滤。
- 偏移量 offset:
- broker中某个队列的偏移量 - 当前队列中有多少消息。
- 消费者消费到哪个消息了。
二、下载与安装
1、Linux安装
下载地址:https://rocketmq.apache.org/download
解压进入bin目录:
启动nameServer sh mqnamesrv
如果出现内存空间不足的异常,就修改runserver.sh文件的初始堆空间大小
启动broker sh mqbroker -c ../conf/broker.conf -n 127.0.0.1:9876 autoCreateTopicEnable=true指定配置文件:-c ../conf/broker.conf 指定nameserver:-n 127.0.0.1:9876 开启自动创建主题:autoCreateTopicEnable=true
如果出现内存空间不足的异常,就修改runbroker.sh文件的初始堆空间大小
2、控制台安装
下载代码:rocketmq-dashboard.git
修改配置文件,打成jar包:
启用服务:
3、开放相关端口
9876,10909,10911等常用端口
# 检查已开放的端口
firewall-cmd --list-ports# 新增开放端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent# 重启防火墙
systemctl restart firewalld.service
三、发送普通消息
1、同步发送
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 普通消息 - 同步发送* 发送消息后同步响应发送结果* 适用于短信通知、消息通知*/
public class SyncProducer {public static void main(String[] args) {// 创建生产者DefaultMQProducer producer = new DefaultMQProducer("message-group-1");// 设置nameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者try {producer.start();// 发送消息for (int i = 0; i < 10; i++) {// 创建消息实体Message message = new Message("MyTopic", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息并接收响应SendResult result = producer.send(message);// 打印响应System.out.println(result);}} catch (Exception e) {throw new RuntimeException(e);} finally {// 停止生产者producer.shutdown();}}
}
响应内容:
控制台信息:每个队列分别有5条消息。
2、异步发送
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 普通消息 - 异步发送* 通过监听获取发送消息的响应,不会阻塞发送消息的线程* 适用于对处理速度要求较高的场景,让生产者可以连续发送消息,而不用阻塞*/
public class AsyncProducer {public static void main(String[] args) {// 创建生产者DefaultMQProducer producer = new DefaultMQProducer("message-group-2");// 设置nameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者try {producer.start();// 发送消息for (int i = 0; i < 10; i++) {// 创建消息实体Message message = new Message("message-topic-2", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息并接收响应producer.send(message, new SendCallback() {// 异步监听,通过回调处理发送消息响应@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功!");System.out.println(sendResult);}@Overridepublic void onException(Throwable e) {System.out.println(e.getMessage());}});}Thread.sleep(10000);} catch (Exception e) {throw new RuntimeException(e);} finally {// 停止生产者producer.shutdown();}}
}
控制台结果
3、单向发送
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 普通消息-单向发送* 不关心消息是否被接收,仅发送* 适用于耗时较短,不关心数据准确的场景 - 日志收集*/
public class OnewayProducer {public static void main(String[] args) {// 创建生产者DefaultMQProducer producer = new DefaultMQProducer("message-group-3");// 设置nameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者try {producer.start();// 发送消息for (int i = 0; i < 10; i++) {// 创建消息实体Message message = new Message("message-topic-3", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息-单向发送producer.sendOneway(message);}} catch (Exception e) {throw new RuntimeException(e);} finally {// 停止生产者producer.shutdown();}}
}
四、消费普通消息
1、集群消费模式 - 默认模式
在一个消费者组内,多个消费者协同消费同一个topic中的消息,每个消费者负责一部分消息。
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;import java.util.List;/*** 集群模式消费者*/
public class BalanceConsumer {public static void main(String[] args) throws Exception {// 创建消费者对象 - 指定群组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");// 指定nameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅topicconsumer.subscribe("message-topic-1", "*");// 指定集群消费模式 - 默认consumer.setMessageModel(MessageModel.CLUSTERING);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : messageExtList) {System.out.println("topic:" + messageExt.getTopic() + ",message:" + new String(messageExt.getBody(), "UTF-8"));}} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println("消费者已启动");}
}
2、广播消费模式
在一个消费者群组内,每一条消息都被每一个消费者消费一遍。
消费进度不会由broker维护,而是由消费者维护。
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;import java.util.List;/*** 集群模式消费者*/
public class BroadcastConsumer {public static void main(String[] args) throws Exception {// 创建消费者对象 - 指定群组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-2");// 指定nameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅topicconsumer.subscribe("message-topic-3", "*");// 指定集群消费模式 - 默认consumer.setMessageModel(MessageModel.BROADCASTING);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : messageExtList) {System.out.println("topic:" + messageExt.getTopic() + ",message:" + new String(messageExt.getBody(), "UTF-8"));}} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println("消费者已启动");}
}
五、顺序消息
1、全局顺序消息
消息的生产、消费全局有序,只有一个消息队列。
2、部分顺序消息
消息的生产、消费在仅在一个消息队列内有序,可以有多个消息队列。
3、顺序消息生产
发送消息方法中指定队列选择器规则,规则参数即可。
package org.apache.rocketmq.example.quickstart;import jdk.nashorn.internal.objects.annotations.Getter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.ArrayList;
import java.util.List;/*** 部分顺序消息生产者*/
public class InOrderProducer {public static void main(String[] args) {// 创建生产者DefaultMQProducer producer = new DefaultMQProducer("message-group-4");// 设置nameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者try {producer.start();// 发送消息for (Order order : buildOrderList()) {// 创建消息实体Message message = new Message("message-topic-5", null, order.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息-单向发送// 指定消息体,队列选择器,选择器参数(订单id)producer.sendOneway(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 通过取模确定相同id的订单数据发送到同一个消息队列中return mqs.get((Integer) arg % mqs.size());}}, order.getOrderId());}} catch (Exception e) {throw new RuntimeException(e);} finally {// 停止生产者producer.shutdown();}}// 创建Order类private static class Order {private final Integer orderId;private final String step;public Order(Integer orderId, String step) {this.orderId = orderId;this.step = step;}public Integer getOrderId() {return orderId;}public String getStep() {return step;}}// 构造消息列表// 5个订单 3个步骤private static List<Order> buildOrderList() {List<Order> orderList = new ArrayList<>();orderList.add(new Order(1, "创建"));orderList.add(new Order(2, "创建"));orderList.add(new Order(3, "创建"));orderList.add(new Order(4, "创建"));orderList.add(new Order(5, "创建"));orderList.add(new Order(4, "支付"));orderList.add(new Order(2, "支付"));orderList.add(new Order(3, "支付"));orderList.add(new Order(1, "支付"));orderList.add(new Order(5, "支付"));orderList.add(new Order(2, "发货"));orderList.add(new Order(4, "发货"));orderList.add(new Order(1, "发货"));orderList.add(new Order(3, "发货"));orderList.add(new Order(5, "发货"));return orderList;}
}

4、顺序消息消费
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 顺序消费者*/
public class InOrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-3");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("message-topic-6", "*");// 指定消费偏移量consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 指定消费监听器consumer.registerMessageListener(new RegisterMessageListener());// 启动消费者consumer.start();System.out.println("消费者已启动");}// 构建消费监听器 - 顺序监听器private static class RegisterMessageListener implements MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt messageExt : messageExtList) {try {System.out.println("QueueId:" + messageExt.getQueueId() + " , body:" + new String(messageExt.getBody(), "UTF-8"));} catch (Exception e) {System.out.println("消费异常:" + e);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}
}
六、延时消息
设定消息需要在多久之后才能被消费。
使用场景:下单后未支付,定时取消订单。下单后向队列中写入一条延时消息,消费者拿到消息后先判断是否已支付,已支付就忽略,没支付就取消。
Message message = new Message("message-topic-3", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延时等级 一共有18个等级,等级越大延迟越久
message.setDelayTimeLevel(1);
七、批量消息
为了提高发送消息的性能,可以将多条消息打包成一份批量发送,操作的位置在发送端。
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.List;/*** 批量消息生产者*/
public class BatchProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();// 批量发送消息try {// 如果消息总大小超过4M,需要手动切分producer.send(buildBatchMessage());} catch (Exception e) {throw new RuntimeException(e);} finally {producer.shutdown();}}private static List<Message> buildBatchMessage() {List<Message> messageList = new ArrayList<>();messageList.add(new Message("message-topic-7", null, "message-1".getBytes()));messageList.add(new Message("message-topic-7", null, "message-2".getBytes()));messageList.add(new Message("message-topic-7", null, "message-3".getBytes()));messageList.add(new Message("message-topic-7", null, "message-4".getBytes()));messageList.add(new Message("message-topic-7", null, "message-5".getBytes()));messageList.add(new Message("message-topic-7", null, "message-6".getBytes()));messageList.add(new Message("message-topic-7", null, "message-7".getBytes()));return messageList;}
}
八、过滤消息
进行消息生产的时候,设置一些标签或属性,在消费时使用这些标签和属性实现过滤。
1、tag过滤
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.Arrays;
import java.util.List;/*** tag过滤生产者*/
public class TagFilterProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new BaseProducer().getInstance();producer.start();List<String> list = Arrays.asList("tagA", "tagB", "tagC");for (String tag : list) {Message message = new Message("topic-8", tag, ("message-" + tag).getBytes());try {SendResult send = producer.send(message);System.out.println(send);} catch (Exception e) {throw new RuntimeException(e);}}producer.shutdown();}
}
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** tag过滤消费者*/
public class TagFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("topic-8", "tagA || tagB");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExtList) {System.out.println(messageExt.getTags() + "---" + new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
2、SQL过滤
package org.apache.rocketmq.example.quickstart.filter;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.example.quickstart.base.BaseProducer;import java.util.Arrays;
import java.util.List;/*** tag过滤生产者*/
public class SQLFilterProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new BaseProducer().getInstance();producer.start();List<String> list = Arrays.asList("tagA", "tagB", "tagC", "tagD");for (int i = 0; i < list.size(); i++) {String tag = list.get(i);Message message = new Message("topic-9", tag, ("message-" + tag).getBytes());// 每一个消息都单独指定一个参数 - amessage.putUserProperty("a", String.valueOf(i));try {SendResult send = producer.send(message);System.out.println(send);} catch (Exception e) {throw new RuntimeException(e);}}producer.shutdown();}
}
package org.apache.rocketmq.example.quickstart.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** tag过滤消费者*/
public class SQLFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");consumer.setNamesrvAddr("127.0.0.1:9876");// 配置sql过滤规则,TAGS-默认属性 a-自定义属性consumer.subscribe("topic-9",MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'tagB')) and (a is not null and a between 0 and 3)"));consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExtList) {System.out.println(messageExt.getTags() + "---" + new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
九、发送与消费的重要方法、属性
1、发送
方法、属性 | 说明 | 使用场景 |
producerGroup | 生产者组 | 适用于事务消息的回查机制,保证事务消息的高可用。普通消息不用关注。 |
defaultTopicQueueNums | 消息队列数量 | 默认主题在每一个broker中的队列数量,对于新创建的主题有效 |
sendMsgTimeout | 发送消息阻塞时间 | 默认3s |
compressMsgBodyOverHowmuch | 启用压缩的阈值 | 默认4k,如果消息体的大小超过设定的大小,就自动压缩消息 |
retryTimesWhenSendFailed | 同步发送的重试次数 | 默认2次 |
retryTimesWhenSendAsyncFailed | 异步发送的重试次数 | 默认2次 |
retryAnotherBrokerWhenNotStoreOK | 是否换一个broker重试发送消息 | 默认false |
maxMessageSize | 允许发送消息的最大长度 | 默认4M = 1024 * 1024 *4 |
sendOneway(final Message msg) | 单向发送 | 消息发出去就不管了 |
sendOneway(Message msg, | 单向发送,自定义队列选择 | 消息发出去就不管了 |
send(final Message msg) | 同步发送 | |
send(final Message msg, final long timeout) | 同步发送指定阻塞时间 | |
send(final Message msg, final SendCallback sendCallback) | 异步发送,指定回调规则 | |
send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) | 异步发送消息指定队列、回调规则 |
2、消费
方法、属性 | 说明 | 使用场景 |
consumerGroup | 消费者组 | 必填,指定消费者组 |
namesrvAddr | nameServer | 指定消息队列集群 |
messageModel | 消费方式 | CLUSTERING - 集群消费 BROADCASTING - 广播消费 |
consumeFromWhere | 消费开始时的偏移量 | ConsumeFromWhere枚举 |
consumeThreadMin | 最小线程数 | |
consumeThreadMax | 最大线程数 | |
pullInterval | 推模式的时间间隔 | 推模式相当于不断地轮询拉取消息,默认时间间隔为0 |
pullBatchSize | 推模式的每一次条数 | 每次拉取地消息条数,默认32条 |
maxReconsumeTimes | 消息消费的最大重试次数 | 默认16次,超出重试次数还失败就进入死心消息 |
consumeTimeout | 消费地阻塞时间 | 在消费消息时可能会阻塞主线程,所以设定最大阻塞时间 |
fetchSubscribeMessageQueues(String topic) | 获取对当前主题分配的队列 | |
subscribe(String topic, String subExpression) | 订阅主题 | 指定主题和过滤正则表达式 |
subscribe(final String topic, final MessageSelector messageSelector) | 订阅主题并指定过滤规则 | 过滤规则包含 byTag和 bySql |
unsubscribe(String topic) | 取消订阅主题 | |
registerMessageListener(MessageListenerConcurrently messageListener) | 注册事件监听器 | 包含并发监听器和顺序监听器 |
ConsumeConcurrentlyStatus.RECONSUME_LATER | 放入重试队列 | |
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT | 稍等片刻再消费消息 | 保证消息的顺序性 |