rabbitmq 使用SAC队列实现顺序消息

ops/2024/11/20 7:14:47/

rabbitmq_SAC_0">rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {//消息idprivate String requestNo;//消息中顺序,1,2,3,4private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {public static final String EXCHANGE = "order-ex";public static final String RK_PREFIX = "rk-";public static final String ONE_QUEUE = "one-queue";public static final String TWO_QUEUE = "two-queue";public static final String THREE_QUEUE = "three-queue";public static final String FOUR_QUEUE = "four-queue";@Beanpublic DirectExchange exchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, false);}@Beanpublic Binding oneBinding() {return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);}@Beanpublic Binding twoBinding() {return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);}@Beanpublic Binding threeBinding() {return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Binding fourBinding() {return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 4);}@Beanpublic Queue oneQueue() {return createSacQueue(ONE_QUEUE);}@Beanpublic Queue twoQueue() {return createSacQueue(TWO_QUEUE);}@Beanpublic Queue threeQueue() {return createSacQueue(THREE_QUEUE);}@Beanpublic Queue fourQueue() {return createSacQueue(FOUR_QUEUE);}private static Queue createSacQueue(String queueName) {Map<String, Object> arguments = new HashMap<>(2);arguments.put("x-single-active-consumer", true);return new Queue(queueName, true, false, false, arguments);}}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))public void onMessage1(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", ONE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))public void onMessage2(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", TWO_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))public void onMessage3(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", THREE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))public void onMessage4(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", FOUR_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}}
生产者发送消息
@GetMapping("/send/seq/messqge")public String sendSeqMessage() throws JsonProcessingException {int cnt = 100;int mod = 4;int seqSize = 6;for (int i = 0; i < cnt; i++) {for (int j = 0; j < seqSize; j++) {int rk = i % mod + 1;SeqMessage seqMessage = new SeqMessage("seq-" + i, j);String s = objectMapper.writeValueAsString(seqMessage);log.info("routeKey: {}, send msg: {}", rk, s);rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);}}return "success";}

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!


http://www.ppmy.cn/ops/17224.html

相关文章

ZJGSU 1850 不同出栈情况

描述 假设有n个元素依次进栈&#xff0c;给出他们可能的不同的出栈情况。 输入 3 1 2 3 输出 1 2 3 1 3 2 2 1 3 2 3 1 3 2 1 输入样例 1 3 1 2 3 输出样例 1 1 2 3 1 3 2 2 1 3 2 3 1 3 2 1 #include <stdio.h>int tot, res, sta, n; int r[2005], s[2005…

vue项目中显示第三方外部链接的页面

1、打开窗口 window.open(URL, name, specs, replace);window.open()方法用于在浏览器中打开一个新的窗口或标签页。如果不指定第二个参数&#xff0c;则链接通常会在当前窗口中打开&#xff0c;这相当于_self。 name 窗口的名称&#xff0c;如果指定相同的名称&#xff0c;那…

天星金融消保课堂开讲,金融健康意识再提升

近年来&#xff0c;随着消费者对投资理财等金融服务需求的日益增长&#xff0c;金融广告成为消费者获取金融信息的重要途径。然而&#xff0c;一些不法分子通过投放非法金融广告&#xff0c;诱导消费者参与非法金融活动&#xff0c;给消费者的权益带来了严重威胁。为此&#xf…

还在为如何进行视频格式转换而烦恼?别担心,教你2招

在数字化时代&#xff0c;视频格式转换已经成为了我们日常生活中的常见需求。无论是为了在不同的设备、软件上播放&#xff0c;还是为了满足特定的编辑需求&#xff0c;视频格式转换都显得非常重要。然而&#xff0c;对于许多初学者来说&#xff0c;如何进行视频格式转换却成了…

CERLAB无人机自主框架: 2-动态目标检测与跟踪

前言&#xff1a;更多更新文章详见我的个人博客主页【MGodmonkeyの世界】 描述&#xff1a;欢迎来到CERLAB无人机自主框架&#xff0c;这是一个用于自主无人飞行器 (UAV) 的多功能模块化框架。该框架包括不同的组件 (模拟器&#xff0c;感知&#xff0c;映射&#xff0c;规划和…

【SAP ME 12】SAP NWDS(eclipse)下载、安装,配置

1、下载 1.1、描述 1.2、下载 2、安装 3、配置 3.1、域名映射

面向初学者的网络安全(二)

原文&#xff1a;annas-archive.org/md5/8570b4b9b47974c7302ce023e1eb9bc8 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 九、攻击和防御方法的演变 当我写下这本书的第一版时&#xff0c;网络安全只是网络安全专家和精明的高管感兴趣的话题。这本书是为了让任何人…

森林消防隔膜泵的应用与前景——恒峰智慧科技

随着全球气候变暖&#xff0c;森林火灾频发&#xff0c;给生态环境和人类安全带来严重威胁。为有效应对这一挑战&#xff0c;森林消防领域不断引入新技术、新装备。其中&#xff0c;隔膜泵作为一种高效、可靠的消防设备&#xff0c;正逐渐受到广泛关注。本文将探讨森林消防隔膜…