RabbitMQ入门案例之Direct模式

news/2024/12/1 20:28:44/

前言

RabbitMQ的Direct模式是一种可以根据指定路由key,Exchang将消息发送到具有该路由key下的Queue下进行存储。也就类似于将数据写进指定数据库表中。这个路由Key可以类比为SQL语句中的:where routeKey = …

官方文档地址:https://www.rabbitmq.com/getstarted.html

什么是Direct模式

RabbitMQ中的Direct模式是一种消息传输模式,通常使用Direct Exchange(直连交换机)实现。

在Direct模式中,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。

Direct模式常用于一对一的场景,例如订单管理系统中将订单分配给特定的处理队列。

通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载。
在这里插入图片描述

实操

实操准备工作

在开始使用代码进行操作前,我们先到管理界面构造一个Direct交换机,如下图:
在这里插入图片描述
为其绑定Queue,同时设置这个Queue的route key,如下图:
在这里插入图片描述
最终绑定结果:
在这里插入图片描述
既然交换机和队列已经准备好,接下来就是准备依赖与代码了

<!--RabbitMQ依赖-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>

生产者代码

public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("ip地址");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "宇宙无敌爱学习";String  exchangeName = "direct_exchange";String routingKey1 = "class";String routingKey2 = "student";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

消费者代码

public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("ip地址");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();new Thread(runnable, "queue3").start();}
}

在生产者代码中,我们定义了两个route key,如下图:
在这里插入图片描述
在这两个路由key的驱使下,生产者的消息便只会被放到我们刚刚在direct_exchange交换机中具有这两个路由key的Queue中,我们来执行代码验证一下。

生产者执行结果
在这里插入图片描述
管理界面效果
在这里插入图片描述
可以看出,消息就只放进了queue2和queue3中,这是符合我们预期的。
消费者执行结果,如下:在这里插入图片描述
管理界面效果:
在这里插入图片描述
可以看出,消息也被成功取出去。

以上便是Direct模式的全部内容,仅个人笔记使用
感谢阅读

http://www.ppmy.cn/news/314624.html

相关文章

《关于解决发布地图全幅显示方案》

《关于解决发布地图全幅显示方案》 我刚开始做gis项目时候&#xff0c;遇到这种问题&#xff0c;什么问题呢&#xff0c;就是有些发布完地图后&#xff0c;打开服务器时发现自己的刚发布的地图是不是变小或者这样说不在中心点显示地图&#xff0c;这时候&#xff0c;你就要打开…

unity图片`fillAmount`填充方法

前言 在Unity中&#xff0c;Image是一种用于显示2D图像的组件&#xff0c;而fillAmount属性则是Image组件中一个非常常用的属性之一&#xff0c;用于控制图片填充的比例。在这篇文章中&#xff0c;我们将会介绍fillAmount属性的详细用法。 介绍 fillAmount属性是Image组件中…

Unet项目解析(6): 图像分块、整合 / 数据对齐、网络输出转成图像

项目GitHub主页&#xff1a;https://github.com/orobix/retina-unet 参考论文&#xff1a;Retina blood vessel segmentation with a convolution neural network (U-net) 1. 训练数据 1.1 训练图像、训练金标准随机分块 主代码&#xff1a; # 训练集太少&#xff0c;采用分…

openmv学习之旅②之色块追踪算法的改善

大家好&#xff0c;我是杰杰。 实在不好意思&#xff0c;最近比较忙&#xff0c;之前说的连载现在才更新出来。 从上一篇openmv的学习中openmv学习之旅①我们可以很简单运用micropython在openmv上做我们想做的事情。 Python这个东西用起来是很简单的&#xff0c;&#xff0c;…

诺基亚智能手机与NFC功能推出

诺基亚智能手机与NFC功能推出 我收到了我长达十年的朋友威乐Makinnen谁出席推出三款全新的诺基亚智能手机分别是700,701和600 8月24日其在Symbian上运行的所有代号为百丽的电话。诺基亚700仅重96gm&#xff0c;并在110 X 50.7点x9.7毫米它不仅是诺基亚在塞班范围最小巧的智能手…

诺基亚X3兴奋

诺基亚X3兴奋 多年来&#xff0c;诺基亚一直被认为是产生对高端品质和用户友好的移动电话的领先的手机制​​造商。对于每一个诺基亚洁具你会购买&#xff0c;你会放心的无与伦比的质量和功能。 的宗旨&#xff0c;以填补客户的需求越多&#xff0c;诺基亚已经推出了另一种手机…

应用层下的人脸识别(一):图像获取

本文为大家总结了人脸识别技术在安防领域应用的完整流程&#xff0c;以及产品设计的细节。其中包括&#xff1a;如何获取最佳图像&#xff0c;如何进行设备对接等经验。 图像获取是人脸识别的第一步&#xff0c;人脸识别项目中图像来源主要依靠各类监控相机&#xff0c;图像质量…

使用手机作单反相机的遥控器

2019独角兽企业重金招聘Python工程师标准>>> 你的相机用什么方式取景&#xff1f;液晶显示器&#xff1f;光学取景器&#xff1f;还是电子取景器&#xff1f;我们今天要介绍的就是颠覆大部分人使用习惯的一种取景方式&#xff0c;用手机的液晶屏取 景。这里我们要用…