RabbitMQ工作模式-路由模式

news/2024/10/25 19:33:32/

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
在这里插入图片描述

使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

创建生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机,交换器和消息队列的绑定不需要在这里处理。channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);for (int i = 0; i < 30; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String dataMsg = "[" + level + "] 消息发送 :" + i;// 发送消息channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));}}
}

创建ERROR的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class ErrorConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.error",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.error", "ex.routing", "ERROR");// 通过chanel消费消息channel.basicConsume("log.error",(consumerTag, message) -> {System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建INFO级的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class InfoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识true,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.info",// 永久false,// 排他false,// 自动删除false,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.info", "ex.routing", "INFO");// 通过chanel消费消息channel.basicConsume("log.info",(consumerTag, message) -> {System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建WARN级别的消息者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class WarnConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.warn",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.warn", "ex.routing", "WARN");// 通过chanel消费消息channel.basicConsume("log.warn",(consumerTag, message) -> {System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

首先启动三个消费者:

查看队列及交换机情况

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.info         │ queue            │ log.info    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.error        │ queue            │ log.error   │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]# 

可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

启动消费者,查看消息通否被正常消费。

ERROR的消费者控制台输出

ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28

INFO的消费者控制台输出:

INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25

WARN的消费都控制台输出:

warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29

至此,验证已经完成。


http://www.ppmy.cn/news/1075977.html

相关文章

华为云软件精英实战营——感受软件改变世界,享受Coding乐趣

机器人已经在诸多领域显现出巨大的商业价值&#xff0c;华为云计算致力于以云助端的方式为机器人产业带来全新机会 如果您是开发爱好者&#xff0c;想了解华为云&#xff0c;想和其他自由开发者交流经验&#xff1b; 如果您是学生&#xff0c;想和正在从事软件开发行业的大佬…

黑客组织“Anonymous”进行网络攻击抗议日本排放核污水

概述 近期全球都在关注日本核污水排放&#xff0c;起因是日本政府宣布&#xff0c;福岛第一核电站核污染水8月24日开始排入海洋&#xff0c;计划排放30年。那么这件事为什么会引起全球关注呢&#xff0c;大家通过美国对日本投放原子弹后果&#xff0c;导致广岛长崎任然处于核辐…

【C++初阶】stack的常见操作和模拟实现

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

Python2022年09月Python二级 -- 编程题解析

第一题: 某航空公司对于托运行李有尺寸要求&#xff0c;必须满足以下条件:每件托运行李的长、宽、高三边之和须大于或等于60厘米&#xff0c;且小于或等于203厘米。(注意只是三边&#xff0c;不考虑立方体的整个周长&#xff0c;相当于只求长宽高三个数字的和&#xff0c;如&am…

大数据学习06-Spark分布式集群部署

Spark完全分布式部署 前期准备&#xff0c;每台服务器都需要配置安装Scala下载Scala安装包配置环境变量 安装spark解压配置环境修改配置 前期准备&#xff0c;每台服务器都需要配置 配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 TYPE"Ethernet" PROX…

linux————pxe网络批量装机

目录 一、概述 什么是pxe pxe组件 二、搭建交互式pxe装机 一、配置基础环境 二、配置vsftpd 三、配置tftp 四、准备pxelinx.0文件、引导文件、内核文件 一、准备pxelinux.0 二、准备引导文件、内核文件 五、配置dhcp 一、安装dhcp 二、配置dhcp 六、创建default文…

Python中30个常见的内置函数使用讲解(一)

摘要&#xff1a; Python作为一种强大的编程语言&#xff0c;提供了丰富的内置函数&#xff0c;用于各种常见操作&#xff0c;如数学运算、数据转换、迭代控制等。本文将从入门到精通&#xff0c;详细介绍Python中常见的内置函数的用法&#xff0c;通过代码示例和中文注释&…

大门设得好,财旺运也旺

大门作为我们家庭里一扇保护家人的屏障&#xff0c;可以说是非常重要的&#xff0c;它不仅能起到安全作用&#xff0c;在风水上也是非常关键的。大门在风水中是财运进气的门道&#xff0c;大门风水的好坏直接影响到房屋的整体风水&#xff0c;好的大门风水可以让主人旺财又旺运…