Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

news/2024/11/13 3:37:25/

上篇博文中我们介绍了Azure Messaging的重复消息机制、At most once 和At least once.

 Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once

本文中我们主要研究并介绍Azure Messaging的消息回执机制:实际应用场景:

同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址。还

有,生成订单编号场景,发送一个生成订单编号的消息,消息消费者接收生成订单编号的消息,并通过消息回执返回。

Azure Messaging的消息回执机制主要通过:基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现

在代码实现中,我们需要:

1. 两个工作线程,一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

2. 一个会话标识:ReceiptSession  

3. 两个队列Queue:RequestQueue:发送消息、接收消息,ResponseQueue:发送回执消息,接收回执消息。

直接Show Code:

首先,我们在ServiceBusMQManager增加一个线程安全的创建带回话的QueueClient方法:

private static object syncObj = new object();/// <summary>/// 获取要求会话带Session的QueueClient/// </summary>/// <param name="queueName">队列名称</param>/// <returns>QueueClient</returns>public QueueClient GetSessionQueueClient(string queueName){var namespaceClient = NamespaceManager.Create();if (!namespaceClient.QueueExists(queueName)){lock (syncObj){if (!namespaceClient.QueueExists(queueName)){var queue = new QueueDescription(queueName) { RequiresSession = true };namespaceClient.CreateQueue(queue);}}}return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete);}

然后我们定义一些常量:

        private static readonly string ReplyToSessionId = "ReceiptSession";const double ResponseMessageTimeout = 20.0;private static readonly string requestQueueName = "RequestQueue";private static readonly string responseQueueName = "ResponseQueue";

实现发送并接收回执消息的方法:

        /// <summary>/// 发送并接收回执消息/// </summary>/// <param name="bills"></param>public static void SendMessage(){var manager = new ServiceBusUtils();var responseClient = manager.GetSessionQueueClient(responseQueueName);var requestClient = manager.GetSessionQueueClient(requestQueueName);var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);var order = CreateSalesOrder(1);//发送消息var message = new BrokeredMessage(order);message.Properties.Add("Type", order.GetType().ToString());message.SessionId = ReplyToSessionId;message.MessageId = "OrderMessage001";message.ReplyTo = responseQueueName;requestClient.Send(message);Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);//接收消息回执var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));var receivedOrder = receivedMessage.GetBody<SalesOrder>();Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);messsageReceiver.Close();}

实现接收消息并发送回执方法:

 1         /// <summary>
 2         /// 接收消息并回执
 3         /// </summary>
 4         public static void ReceiveMessage()
 5         {
 6             var manager = new ServiceBusUtils();
 7 
 8             var requestClient = manager.GetSessionQueueClient(requestQueueName);
 9             var session = requestClient.AcceptMessageSession();
10             var requestMessage = session.Receive();
11            
12             if (requestMessage != null)
13             {
14                 var receivedOrder = requestMessage.GetBody<SalesOrder>();
15                 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
16 
17                 var responseMessage = new BrokeredMessage(receivedOrder);
18                 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());
19                 responseMessage.ReplyToSessionId = ReplyToSessionId;
20                 responseMessage.MessageId = "ResponseOrderMessage001";
21                 responseMessage.SessionId = requestMessage.SessionId;
22                
23                 //发送回执消息
24                 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);
25                 responseClient.Send(responseMessage);
26                 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);               
27             }
28         }

Main方法中,启动两个工作线程:一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

因为涉及到Azure Messaging中队列的第一次创建,Azure Messaging是不支持多个请求同时创建同一个队列的,因此,我们两个线程间做一个简单的Task.Delay(3000).Wait();

 1         static void Main(string[] args)
 2         {
 3             var sendTask = Task.Factory.StartNew(() => { SendMessage(); });
 4             Task.Delay(3000).Wait();
 5             var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); });
 6 
 7             Task.WaitAll(sendTask, receiveTask);
 8 
 9             Console.ReadKey();           
10         }

我们看看程序输出:

Azure 服务总线中的队列:

可以看出:Azure Messaging-ServiceBus Messaging 基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现消息回执机制。

 

周国庆

2017/3/23

 

转载于:https://www.cnblogs.com/tianqing/p/6607682.html


http://www.ppmy.cn/news/698922.html

相关文章

如何在word中实现连续编号自动打印

如何在word中实现连续编号自动打印 因机关工作需要&#xff0c;经常在发文打印时&#xff0c;需要逐份输入连续文件编号。在百度上查找相关方法&#xff0c;包括VBA程序都不满意&#xff0c;大多有点错误。因此手写了这份代码&#xff0c;现提供同学们参考&#xff1a; Sub Pri…

Html + css 实现银行回执单 页面

最近来了一个需求&#xff0c;需要去弄一个银行回执单页面&#xff0c;之前的业务需求是需要客户去手动上传银行回执单的&#xff0c;因为之前的业务是客户去一个一个的去上传&#xff0c;这样当数据多的时候&#xff0c;客户就提出来说需要进行批量去操作&#xff0c;这样子就…

oracle应付账款凭证编号查找,记账凭证编号怎么填写 记账凭证编号的规则

记账凭证的编号方法有总字编号法、三类编号法、五类编号法和分数编号法。会计人员要根据不同的记账凭证采用相应的编号方法。下面小编就为大家解开记账凭证编号的规则&#xff0c;希望能帮到你。 记账凭证编号的规则 1.日期的填写。现金收付记账凭证的日期以办理收付现金的日期…

[前端]JS——join()与split()的使用

Array.join():数组转换为字符串,"()"里元素指定数组转为字符串用什么串联&#xff0c;默认为空。 Array.join()的使用&#xff1a; <script>let arr[1,2,3,4]console.log("arr未转换前:",arr,typeof(arr));console.log("arr使用join():"…

Linux和Windows比较

Linux和Windows Linux和Windows是两种十分常见的电脑操作系统&#xff0c;它们都有着广泛的应用和用户群体。在使用过程中&#xff0c;有些事情可能会因操作系统不同而产生差异&#xff0c;以下是一些可能会引起用户困惑的例子&#xff1a; 软件管理&#xff1a;在Linux上&am…

用华为手机将录音转文字竟如此简单,手机秒变会议神器,太好用了

华为手机在今年的第二季度占据38%的市场份额&#xff0c;可以说是远远甩开的友商&#xff0c;那华为手机为什么这么受消费者欢迎呢&#xff1f;其实很大一部分是因为它的实用性&#xff0c;可是说华为手机很多功能都可以帮助我们提高工作效率。 比如无线投屏、华为云电脑、备忘…

刚刚才知道,华为手机的通话录音还能这样玩?没人知道就太浪费了

刚刚才知道&#xff0c;华为手机的通话录音还能这样玩&#xff1f;没人知道就太浪费了 除了iPhone手机以外&#xff0c;我们日常见到的Android系统的手机几乎都能够进行通话录音。这个功能主要用于帮助我们记录通话中的重要事项&#xff0c;方便我们后期使用。 那么大家就来跟…

手机正在录音怎么隐藏

以小米10为例&#xff0c;手机正在录音&#xff0c;是无法隐藏录音图标的&#xff0c;只有停止录音时&#xff0c;才能隐藏录音图标。 小米10是小米十周年之际的旗舰作品&#xff0c;是首发骁龙865的智能手机&#xff0c;于2020年2月13日正式发布。小米10是小米十周年之际的旗舰…