4 路由模式

news/2024/9/16 11:39:17/ 标签: rabbitmq

路由模式

逻辑图

image-20210811143722455

如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解

一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的

一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的

如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】

如果有一条 info 的日志,它应当只发送给【所有】

如果有一条 warning 的日志,它应当只发送给【所有】

如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class RoutingProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* ----------------* 创建交换机* 创建队列* 交换机绑定到队列* <p>* 发送消息*///定义交换机名称private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";//定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";//定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机,使用路由模式的交换机channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);//创建队列channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);//绑定交换机/*** String queue                 :队列名称* String exchange              :交换机名称* String routingKey            :路由键,fanout 广播模式不需要路由键* Map<String, Object> arguments:参数*/channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");//发送短信String[] keys = {"error", "info", "warning"};int errorCount = 0;int infoCount = 0;int warningCount = 0;for (int i = 0; i < 30; i++) {int random = (int) (Math.random() * (3 - 1 + 1)) + 0;   //生成0,1,2随机数String logLevel = keys[random];String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("发送消息:\t" + str);channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());if (random == 0) {errorCount++;} else if (random == 1) {infoCount++;} else if (random == 2) {warningCount++;}}System.out.println("error\t共计: " + errorCount + "条");System.out.println("info\t共计: " + infoCount + "条");System.out.println("warning\t共计: " + warningCount + "条");// 关闭资源channel.close();connection.close();}
}

消费者

error

  • 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ErrorRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【error消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

error info warning

  • 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AllRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【all 消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

测试

  • 启动生产者,查看 RabbitMQ 网页控制条

  • 启动 error 消费者

  • 启动 all 消费者

  • 再次启动生产者

image-20210811155433733

image-20210811155448009image-20210811155457070

SpringBoot 整合

小结


http://www.ppmy.cn/news/1522509.html

相关文章

简说目前市面上最流行的“AI Agentic”

背景 当吴恩达在布道完著名的Agent设计模式后 他于不久后又引领了AI界的开发们开始关注另一种高级开发模式&#xff0c;即"Agentic"&#xff0c;吴恩达多次反复强调&#xff1a;“Agentic是比Agent更具未来”。 那么什么是Agentic呢&#xff1f; 什么是AI Agentic…

新换了电脑,电脑里常用的6款软件,下载回来继续用

新换了电脑&#xff0c;准备把之前电脑里常用的几款软件都下载回来继续用&#xff0c;独乐乐不如众乐乐&#xff0c;分享一下~ 1、Listen 1 一款开源、免费的音乐播放器&#xff0c;它能够整合多个主流音乐平台的资源&#xff0c;包括网易云音乐、QQ音乐、酷狗音乐、酷我音乐、…

[SWPUCTF 2021 新生赛]web方向(一到六题) 解题思路,实操解析,解题软件使用,解题方法教程

题目来源 NSSCTF | 在线CTF平台因为热爱&#xff0c;所以长远&#xff01;NSSCTF平台秉承着开放、自由、共享的精神&#xff0c;欢迎每一个CTFer使用。https://www.nssctf.cn/problem [SWPUCTF 2021 新生赛]gift_F12 这个题目简单打开后是一个网页 我们一般按F12或者是右键查…

WorkPlus安全即时通讯:端到端加密开启信息保密新时代

在数字化时代&#xff0c;信息的保密性和安全性变得越发重要。企业和个人需要确保他们的敏感信息和机密通讯不会落入黑客或第三方的手中。为了满足这一需求&#xff0c;WorkPlus安全即时通讯平台应运而生。作为一款拥有端到端加密功能的通讯平台&#xff0c;WorkPlus着重于保护…

小米Vela:端侧AI推理框架

小米Vela是小米公司基于开源实时操作系统NuttX打造的物联网嵌入式软件平台。该平台旨在为各种物联网硬件提供统一的软件服务&#xff0c;支持丰富的组件和易用的框架&#xff0c;以打通碎片化的物联网应用场景。2024年8月在“开源中国开源世界”大会&#xff0c;小米对外公开超…

python 解析数据后保存到excel

openpyxl 特点&#xff1a; 支持读写Excel 2010 xlsx/xlsm/xltx/xltm文件格式。可以操作Excel的几乎所有功能&#xff0c;如样式、图表、图片等。适用于复杂的Excel操作&#xff0c;例如公式、数据验证和条件格式。社区支持较好&#xff0c;文档比较完善。 优点&#xff1a; 功…

MyBatis入门 – 动态SQL

MyBatis入门 – 动态SQL 1.动态SQL介绍 1.1 什么是动态SQL 在原先的JDBC中&#xff0c;开发者需要根据业务的不同要求手动拼接SQL语句&#xff0c;不仅增加开发的复杂度&#xff0c;同时也降低开发效率。而动态SQL则能够根据不同业务场景动态构建查询。动态SQL一般是根据用户…

Java网络编程入门

在现代软件开发中&#xff0c;网络编程是一项不可或缺的技能。Java提供了强大的网络编程支持&#xff0c;使得开发者能够轻松地创建网络应用程序。今天将介绍Java中的网络编程基础&#xff0c;重点讲解Socket和ServerSocket类的使用。 什么是Socket&#xff1f; Socket是网络通…

android系统源码12 修改默认桌面壁纸--SRO方式

1、aosp12修改默认桌面壁纸 代码路径 &#xff1a;frameworks\base\core\res\res\drawable-nodpi 替换成自己的图片即可&#xff0c;不过需要覆盖所有目录下的图片。 由于是静态修改&#xff0c;则需要make一下&#xff0c;重新编译。 2、方法二Overlay方式 由于上述方法有…

[动态规划] 删除并获得点数

给你一个整数数组 nums &#xff0c;你可以对它进行一些操作。 每次操作中&#xff0c;选择任意一个 nums[i] &#xff0c;删除它并获得 nums[i] 的点数。之后&#xff0c;你必须删除 所有 等于 nums[i] - 1 和 nums[i] 1 的元素。 开始你拥有 0 个点数。返回你能通过这些操…

vue3缺陷

Vue 3 的一些缺陷包括&#xff1a; 1. 兼容性问题&#xff1a;由于 Vue 3 使用了新的响应式系统&#xff0c;与 Vue 2 的代码不兼容。这意味着在迁移现有项目时需要进行一些改动。 2. 学习曲线&#xff1a;Vue 3 引入了一些新的概念和 API&#xff0c;相对于 Vue 2 有一定的学习…

如何利用AI优化知识中台的用户体验

引言 在数字化时代&#xff0c;知识中台作为企业知识管理与服务的重要载体&#xff0c;其用户体验的优劣直接关乎到信息的有效传递、员工的学习效率及企业的整体创新能力。随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;将AI融入知识中台的设计与优化中&a…

Linux系统高效进程控制的实战技巧

Linux系统高效进程控制的实战技巧 Linux是一种开源的Unix-like操作系统内核&#xff0c;由林纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;于1991年首次发布。Linux以其稳定性、安全性和灵活性而著称&#xff0c;广泛应用于服务器、桌面、嵌入式系统等多个领域。在Linux系…

使用Docker快速安装和运行Elasticsearch

Elasticsearch 是一个基于 Lucene 构建的开源搜索引擎&#xff0c;它提供了分布式、多租户能力的全文搜索引擎&#xff0c;具有 HTTP web 接口和无模式的 JSON 文档。在本文中&#xff0c;我们将介绍如何使用 Docker 快速安装和运行 Elasticsearch。 为什么使用 Docker 安装 E…

redis中使用lua脚本

1、现实问题 1.redis采用单线程架构&#xff0c;可以保证单个命令的原子性&#xff0c;但是无法保证一组命令在高并发场景下的原子性。例如&#xff1a; 在串行场景下&#xff1a;A和B的值肯定都是3在并发场景下&#xff1a;A和B的值可能在0-6之间。 2.极限情况下1&#xff1…

Qt Widget核心属性

文章目录 前言enabledgeometrywindowTitlewindowIconwindowOpacitycursorfonttoolTipfocusPolicystyleSheet 前言 Qt中的各种控件&#xff0c;都是继承自QWidget类&#xff0c;了解这个类的属性方法之后&#xff0c;后续的控件也通用 enabled enabled描述了一个控件是否处于…

文件包含PHP伪协议利用方法

1.file://协议 使⽤&#xff1a; file:// ⽂件的绝对路径和⽂件名 2.php?cmdfile://D:\phpstudy_pro\WWW\123.txt 2.php://filter协议 ⽤途&#xff1a;常⽤于读取⽂件 / 源码 2.php?cmdphp://filter/readconvert.base64-encode/resource1.php 3.php://input协议 步骤一&…

【C++拓展(一)】后端开发常用的技术栈

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; C拓展 1. 前言2. 语言层面3. 设计模式层面4. 开…

「Qt Widget中文示例指南」如何实现一个系统托盘图标?(一)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 System Tray Icon&a…

基础学习之——Apache Spark

Apache Spark是一种开源的大数据处理框架&#xff0c;它提供了快速、通用和可扩展的大数据分析和处理功能。Spark可以在大规模数据集上进行高速计算&#xff0c;并且可以与多种数据源和工具进行集成。 Spark的基本概念包括&#xff1a; 弹性分布式数据集&#xff08;Resilient…