RocketMQ实现顺序消息
实际开发中如果没有顺序消费的必要需求,不建议使用顺序消费,顺序消费是单线程的效率比较低。
代码示例
-------生产者
生产者主要设置队列选择器在里面写选择队列的逻辑,通过取模进行选取, rocketMQTemplate.sendOneWayOrderly(“orderlyTopicBoot”,msg,String.valueOf(step.getOrderId()));
/***生产者代码* 顺序消费*/@Testpublic void sendOlderlyMsg(){//设置队列选择器rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {String orderIdStr = (String) o;long orderId = Long.parseLong(orderIdStr);int index = (int) (orderId % list.size());return list.get(index);}});List<OrderStep> orderSteps = OrderUtil.buildOrders();for (OrderStep step : orderSteps) {Message<String> msg = MessageBuilder.withPayload(step.toString()).build();rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(step.getOrderId()));}}
-------消费者
只需要在@RocketMQMessageListener注解中指定一下消费模式为顺序消费即可开启单线程消费consumeMode = ConsumeMode.ORDERLY
/**** 顺序消费*/
@Component
@RocketMQMessageListener(consumerGroup = "olderConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
public class OlderlyTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("当前线程:"+Thread.currentThread()+"队列ID:"+messageExt.getQueueId()+",消息内容:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}