4 路由模式

news/2025/1/15 17:23:52/

路由模式

逻辑图

image-20210811143722455

如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解

一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的

一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的

如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】

如果有一条 info 的日志,它应当只发送给【所有】

如果有一条 warning 的日志,它应当只发送给【所有】

如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class RoutingProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* ----------------* 创建交换机* 创建队列* 交换机绑定到队列* <p>* 发送消息*///定义交换机名称private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";//定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";//定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机,使用路由模式的交换机channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);//创建队列channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);//绑定交换机/*** String queue                 :队列名称* String exchange              :交换机名称* String routingKey            :路由键,fanout 广播模式不需要路由键* Map<String, Object> arguments:参数*/channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");//发送短信String[] keys = {"error", "info", "warning"};int errorCount = 0;int infoCount = 0;int warningCount = 0;for (int i = 0; i < 30; i++) {int random = (int) (Math.random() * (3 - 1 + 1)) + 0;   //生成0,1,2随机数String logLevel = keys[random];String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("发送消息:\t" + str);channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());if (random == 0) {errorCount++;} else if (random == 1) {infoCount++;} else if (random == 2) {warningCount++;}}System.out.println("error\t共计: " + errorCount + "条");System.out.println("info\t共计: " + infoCount + "条");System.out.println("warning\t共计: " + warningCount + "条");// 关闭资源channel.close();connection.close();}
}

消费者

error

  • 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ErrorRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【error消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

error info warning

  • 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AllRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【all 消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

测试

  • 启动生产者,查看 RabbitMQ 网页控制条

  • 启动 error 消费者

  • 启动 all 消费者

  • 再次启动生产者

image-20210811155433733

image-20210811155448009image-20210811155457070

SpringBoot 整合

小结


http://www.ppmy.cn/news/1522509.html

相关文章

简说目前市面上最流行的“AI Agentic”

背景 当吴恩达在布道完著名的Agent设计模式后 他于不久后又引领了AI界的开发们开始关注另一种高级开发模式&#xff0c;即"Agentic"&#xff0c;吴恩达多次反复强调&#xff1a;“Agentic是比Agent更具未来”。 那么什么是Agentic呢&#xff1f; 什么是AI Agentic…

新换了电脑,电脑里常用的6款软件,下载回来继续用

新换了电脑&#xff0c;准备把之前电脑里常用的几款软件都下载回来继续用&#xff0c;独乐乐不如众乐乐&#xff0c;分享一下~ 1、Listen 1 一款开源、免费的音乐播放器&#xff0c;它能够整合多个主流音乐平台的资源&#xff0c;包括网易云音乐、QQ音乐、酷狗音乐、酷我音乐、…

[SWPUCTF 2021 新生赛]web方向(一到六题) 解题思路,实操解析,解题软件使用,解题方法教程

题目来源 NSSCTF | 在线CTF平台因为热爱&#xff0c;所以长远&#xff01;NSSCTF平台秉承着开放、自由、共享的精神&#xff0c;欢迎每一个CTFer使用。https://www.nssctf.cn/problem [SWPUCTF 2021 新生赛]gift_F12 这个题目简单打开后是一个网页 我们一般按F12或者是右键查…

WorkPlus安全即时通讯:端到端加密开启信息保密新时代

在数字化时代&#xff0c;信息的保密性和安全性变得越发重要。企业和个人需要确保他们的敏感信息和机密通讯不会落入黑客或第三方的手中。为了满足这一需求&#xff0c;WorkPlus安全即时通讯平台应运而生。作为一款拥有端到端加密功能的通讯平台&#xff0c;WorkPlus着重于保护…

小米Vela:端侧AI推理框架

小米Vela是小米公司基于开源实时操作系统NuttX打造的物联网嵌入式软件平台。该平台旨在为各种物联网硬件提供统一的软件服务&#xff0c;支持丰富的组件和易用的框架&#xff0c;以打通碎片化的物联网应用场景。2024年8月在“开源中国开源世界”大会&#xff0c;小米对外公开超…

python 解析数据后保存到excel

openpyxl 特点&#xff1a; 支持读写Excel 2010 xlsx/xlsm/xltx/xltm文件格式。可以操作Excel的几乎所有功能&#xff0c;如样式、图表、图片等。适用于复杂的Excel操作&#xff0c;例如公式、数据验证和条件格式。社区支持较好&#xff0c;文档比较完善。 优点&#xff1a; 功…

MyBatis入门 – 动态SQL

MyBatis入门 – 动态SQL 1.动态SQL介绍 1.1 什么是动态SQL 在原先的JDBC中&#xff0c;开发者需要根据业务的不同要求手动拼接SQL语句&#xff0c;不仅增加开发的复杂度&#xff0c;同时也降低开发效率。而动态SQL则能够根据不同业务场景动态构建查询。动态SQL一般是根据用户…

Java网络编程入门

在现代软件开发中&#xff0c;网络编程是一项不可或缺的技能。Java提供了强大的网络编程支持&#xff0c;使得开发者能够轻松地创建网络应用程序。今天将介绍Java中的网络编程基础&#xff0c;重点讲解Socket和ServerSocket类的使用。 什么是Socket&#xff1f; Socket是网络通…