交换机-Exchanges

news/2024/11/22 19:56:24/

交换机

Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

Exchanges 的类型

  • 直接:direct 路由模式
  • 主题:topic
  • 标题:headers(不常用)
  • 扇出:fanout 广播模式,发布订阅模式
无名exchange

第一个参数是交换机的名称,空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
绑定bindings

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。

在这里插入图片描述

Fanout模式介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。
在这里插入图片描述

代码示例
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.Scanner;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明扇出类型交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//向交换机发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.printf("消息:%s发送成功!",message);}}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}

Direct模式介绍

Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct(直接) 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
在这里插入图片描述

  • 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green
  • 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 black/green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
多重绑定

在这里插入图片描述

如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多

代码实战
package com.vmware.rabbit.demo6;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;
import java.util.UUID;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.232");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//向交换机发送消息Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String message = UUID.randomUUID().toString();String routingKey = scanner.next();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.printf("发送消息:%s成功!RoutingKey:%s\n",message,routingKey);}}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"error");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"info");channel.queueBind(queueName,EXCHANGE_NAME,"warn");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}

Topics主题模式介绍

存在的问题:尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节

  • *星号可以代替一个单词

  • #井号可以替代零个或多个单词
    在这里插入图片描述

  • 当一个队列绑定键是#那么这个队列将接收所有数据,就有点像fanout了

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

代码实现
package com.vmware.rabbit.demo7;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;
import java.util.Map;public class Producer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();System.out.println("连接RabbitMQ服务器成功!");Channel channel = connection.createChannel();//声明主题模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);System.out.println("交换机创建成功!");Thread.sleep(15*1000);//发布消息HashMap<String,String> msgMap = new HashMap<>();msgMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");msgMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");msgMap.put("quick.orange.fox","被队列 Q1 接收到");msgMap.put("lazy.brown.fox","被队列 Q2 接收到");msgMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");msgMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");msgMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");msgMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for (Map.Entry<String, String> entry : msgMap.entrySet()) {channel.basicPublish(EXCHANGE_NAME,entry.getKey(),null,entry.getValue().getBytes("UTF-8"));}}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q1";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"\tRouting Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback = (tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q2";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"Routing Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=(tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

http://www.ppmy.cn/news/57711.html

相关文章

长/短 链接/轮询 和websocket

短连接和长连接 短连接&#xff1a; http协议底层基于socket的tcp协议&#xff0c;每次通信都会新建一个TCP连接&#xff0c;即每次请求和响应过程都经历”三次握手-四次挥手“优点&#xff1a;方便管理缺点&#xff1a;频繁的建立和销毁连接占用资源 长连接&#xff1a; 客…

用ChatGPT问DotNet的相关问题,发现DotNet工程师的前景还不错

本人最近费了九牛二虎之力注册了一个ChatGPT账号&#xff0c;现在就给大家分享一下&#xff0c;问一下关于.NET的问题&#xff0c;看看ChatGPT的AI功能具体如何&#xff1f; 一、C#跟其它语言比较的优势 回答&#xff1a; C#是一门编程语言&#xff0c;它是为 Microsoft 的 …

永磁同步电机(PMSM)无传感器控制基于滑膜观测器Matlab/Simulink仿真分析

文章目录 前言一、状态观测器二、滑膜状态观测器2.1.滑膜观测器的原理2.2.传统的滑膜观测器2.3.改进的滑膜观测器 三、Matlab/Simulink仿真分析3.1.仿真电路分析3.1.1 电机控制模式切换3.1.2 速度环控制3.1.3 电流环控制3.1.4 电机主电路 3.2.仿真结果分析 总结 前言 本章节采…

【前端知识】内存泄漏与垃圾回收机制 (下)

【前端知识相关分享】内存泄漏与垃圾回收机制 &#xff08;下&#xff09; 6. 内存泄漏的解决方法6.1 解决方法概述6.2 什么是垃圾6.3 垃圾回收机制的定义及规则6.4 垃圾回收算法的基本流程 7. 垃圾回收的常见算法7.1 引用计数7.2 标记清除7.3 复制算法7.4 标记整理&#xff08…

基于微信小程序的垃圾分类系统的研究与实现(附源码和教程)

1. 简介 本文介绍的事基于微信小程序的垃圾分类系统&#xff0c;主要实现的功能有登录、注册、垃圾分类查询、垃圾预约回收、垃圾分类功能。 2.系统设计与实现 本章节是论文的重点&#xff0c;基于上一章介绍的总体设计框架的搭建&#xff0c;详细对小程序的页面布局、流程设…

LVS +Keepalived 高可用群集部署

一、LVSKeepalived 高可用群集 在这个高度信息化的 IT 时代&#xff0c;企业的生产系统、业务运营、销售和支持&#xff0c;以及日常管理等环节越来越依赖于计算机信息和服务&#xff0c;对高可用&#xff08;HA&#xff09;技术的应用需求不断提高&#xff0c;以便提供持续的…

(构造)(两个相邻特殊点之间的不定长度段维护) Dango

C - Dango (atcoder.jp) #include <iostream> #include <string> using namespace std;int main() {int N;cin >> N;string S;cin >> S;S S -; // 末尾‘-’int ans -1;int j -1;for (int i 0; i < N; i) {if (S[i] -) { // 结尾…

从苏宁电器到卡巴斯基(第二部)第31篇:我当高校教师的这几年 VII

目录 必须要开始做前端开发了 我感觉,三本学生并不比985硕士研究生差 必须要开始做前端开发了 我一开始与X高校签约,签的是《任务工作岗位劳动合同》,合同期限是一年,具体内容是“学院网络安全维护及信息化开发”工作。但由于学校人手不足,因此我也是需要承担授课以及带…