rabbitmq+netcore6 【3】Publish/Subscribe:发布/订阅

news/2024/10/30 11:32:32/

文章目录

    • 1)前言
    • 2)临时队列
    • 3)绑定
    • 4)综合以上代码
      • 准备工作
      • 1、生产者
      • 2、消费者1
      • 3、消费者2
    • 5)验证

官网教程原文链接: https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
翻译版参考链接: https://www.cnblogs.com/grayguo/p/5356070.html

上一章节的文章介绍的是将多个消息发给两个消费者,现在我们来试试将一个消息发给多个消费者,这种模式称为发布/订阅。

为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 :第一个将发出日志消息,第二个将接收并打印它们。在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上。(本质上,已发布的日志消息将广播到所有接收方)

1)前言

  • 生产者(发布者)是发送消息的用户应用程序
  • 队列是存储消息的缓冲区
  • 消费者(接收者)是接收消息的用户应用程序

实际上生产者不知道自己发送的消息会被存入队列中,生产者是直接将消息发送给交换机的,然后根据交换类型由交换机决定将消息发送给哪个队列或哪些队列或直接丢弃该消息。
在这里插入图片描述
交换机类型分为 directtopicheadersfanout ,为了达到日志系统的功能,我们将创建一个fanout类型的交换机,名字叫做logs。fanout类型的交换机会将收到的所有消息发给已知的所有队列

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

可以细心的看一下,当只写两个参数时,duiable和autoDelete默认设置为false:
在这里插入图片描述
这里你会有个疑惑,上一篇work queues的文章中没有提到过交换机的事,是怎么将消息发送给队列的呢?
实际上我们使用的是一个默认的交互机,名字为空(“”),如下
在这里插入图片描述
当指定routingkey后会将消息发给指定的队列,若不指定则将发给所有的队列

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",routingKey: "",basicProperties: null,body: body);

注:使用如下命令可以列出rabbitmq中的已添加的交换机

rabbitmqctl list_exchanges

在这里插入图片描述

2)临时队列

上一篇文章中,我们给每一个队列起了一个名字,这样在消费者代码中指定同样的队列就能对该队列中发布的消息进行消费(生产者、消费者共享一个队列),但是我们这章想做的有所区别:

  • 监听到所有的日志消息,而非其子集
  • 只得到当前正在流转的消息,而非旧的消息

为了解决以上两点我们需要做两件事:

  • 1、无论何时我们连接到rabbitmq都需要新建一个崭新的空队列,换就话说我们可以每次创建一个随机名称的队列,更好的方式是让服务器随机选取一个名字来给我们的队列。使用var queueName = channel.QueueDeclare().QueueName;可以查看名字
  • 2、当消费者断开连接时队列应当同时被删除
  • 在.Net Client 我们使用无参的channel.QueueDeclare()方法来创建一个随机命名的、非持久的、自动删除的、的队列.

3)绑定

交换机与队列之间的关系叫做绑定,添加了绑定交换机才知道给那些队列转发消息。

channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");

使用以下代码可以查看已有的绑定信息

rabbitmqctl list_bindings

4)综合以上代码

准备工作

新建一个netcore6的控制台项目,添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建三个netcore6的控制台项目,分别代表生产者,消费者1,消费者2

1、生产者

输入空格再回车即停止输入

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ProgjectSentLog
{public class MainClass{static void Main() {var factory = new ConnectionFactory(){HostName = "localhost",UserName = "lyh",Password = "1211"};using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var message = Console.ReadLine();while(!String.IsNullOrWhiteSpace(message)){var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange:"logs",routingKey:"",basicProperties:null,body:body);Console.WriteLine("已发送:{0}", message);message= Console.ReadLine();}}}Console.ReadLine();}}
}

2、消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;namespace ProjectReceiveLog1
{public class MainClass{static void Main(){var factory = new ConnectionFactory(){HostName = "localhost",UserName = "lyh",Password = "1211"};using (var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");Console.WriteLine("[*1] Waiting for logs");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine("[x1] {0}", message);};channel.BasicConsume(queue:queueName,autoAck:true,consumer:consumer);Console.WriteLine("Press [enter] to exit");Console.ReadLine();}}}}
}

3、消费者2

代码同消费者1,只是输出略有差别

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;namespace ProjectReceiveLog2
{public class MainClass{static void Main(){var factory = new ConnectionFactory(){HostName = "localhost",UserName= "lyh",Password="1211"};using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: "logs", "");Console.WriteLine("[*2] Waiting for logs");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine("[x2] {0}", message);};channel.BasicConsume(queue: queueName, autoAck: true, consumer);Console.WriteLine("Press [Enter] to exit");Console.ReadLine();}}}}
}

先运行两个消费者,再运行生产者,在生产者处输入的消息可以被两个消费者接收到。
在这里插入图片描述

5)验证

上面说到:当消费者断开连接时队列应当同时被删除,这里可以看出当程序结束时,队列、绑定都已经被删除
在这里插入图片描述
如果想了解如何监听子集要看下一篇文章奥


http://www.ppmy.cn/news/9814.html

相关文章

SpringBoot解决全局和局部跨域问题的两种方式

前言 在如今前后端分离的开发模式下,跨域是一个非常经典的问题,解决的方式也有很多,比如代理服务器,使用JSONP 我之前也写过一篇解决跨域问题的文章,感兴趣的可以参考:解决Vue前后端跨域问题的多种方式 …

YOLOv5 CPU实时的实例分割教程-它来了!

一个不知名大学生,江湖人称菜狗original author: jacky LiEmail : 3435673055qq.comTime of completion:2023.1.6Last edited: 2023.1.6YOLOv5 CPU实时的实例分割教程-它来了!简介前不久,ultralytics发布了一个yolov5 7.0版本&…

小程序 超长页面截图保存web-view+html2canvas

web-view文档建议参考----支付宝提供的文档,html2canvas官方文档(官网可以下载html2canvas.js 和 html2canvas.min.js)。由于篇幅受限,这里就贴了一下用法,对于web-view的配置情况,需要自己去查看文档&…

acwing111-畜栏预定

算法分类: 区间分组的应用 贪心 问题描述 有 N 头牛在畜栏中吃草。 每个畜栏在同一时间段只能提供给一头牛吃草,所以可能会需要多个畜栏。 给定 N 头牛和每头牛开始吃草的时间 A 以及结束吃草的时间 B,每头牛在 [A,B]这一时间段内都会一直…

【java篇】反射机制简单理解

学到JDBC后,使用到反射机制,所以回顾反射机制相关知识点; 文章目录 文章目录 什么是反射机制? 如何理解反射呢? 总结 一、Java反射机制是什么? 二、Java反射机制中获取Class的三种方式及区别? 三…

Acwing---795.前缀和

前缀和1.题目2.基本思想3.代码实现4.总结1.题目 输入一个长度为n的整数序列。 接下来再输入m个询问,每个询问输入一对l,r。 对于每个询问,输出原序列中从第l个数到第 r 个数的和。 输入格式 第一行包含两个整数n和m。 第二行包含n个整数&am…

python简单介绍及基础知识(二)

♥️作者:小刘在这里 ♥️每天分享云计算网络运维课堂笔记,疫情之下,你我素未谋面,但你一定要平平安安,一 起努力,共赴美好人生! ♥️夕阳下,是最美的,绽放,…

Linux学习笔记——Nginx安装部署

5.3、Nginx安装部署 5.3.1、简介 Nginx(engine x)是一个高性能的HTTP和反向代理Web服务器,同时也提供了IMAP/POP3/SMTP服务。 同Tomcat一样,Nginx可以托管用户编写的WEB应用程序成为可访问的网页服务,同时也可以作为…