RabbitMQ消息队列实战(5)—— 发后即忘和远程RPC数据传输模型

news/2025/3/13 3:58:29/

        本文我们学习下使用RabbitMQ实现的几种数据发送的模型——发后即忘模型和远程RPC调用。二者实际上是从业务的角度定义的一个RabbitMQ的使用模型。发后即忘模型,强调发送时不太关心消息接收者的执行结果,仅仅是为了发送信息。而远程RPC调用模型强调,另外开辟通道获取消息接收者的执行结果,而且执行的结果直接影响业务。

        从业务上来划分,通常我们通过MQ发出的信息可以分为三种:消息、命令和事件。对于消息来说,我们发送之后不期望会得到回复,或者说不期望马上得到回复,类似于我们接收到手机短信,只是知道这件事情。然后我们怎么去处理或者去不去处理,实际上给我们发送短信的人并不关心,所以这种情况下比较适合使用发后即忘模型。当发送的是命令时,信息的发送者明确知道接受者是谁,通过命令的方式让接收者去进行某项业务,并期望得到反馈,这种情况下比较适合采用远程RPC调用的模型。而最后一种事件,更像是在EDA(Event Driven Architecture)的系统中定义的一种命令,不过命令的格式紧紧和业务模型绑定,所以这里单独提出来叫做事件。很显然,也是使用远程RPC调用的数据发送模型比较合适。

        接下来,我们将以实例的方式分别介绍发后即忘模型和远程RPC调用模型的使用。

一、发后即忘模型

        我们用代码模拟这样一种业务——业务日志的记录。业务日志其实最符合发后即忘模型的要求,因为日志的记录和我们完成一个业务无关(日志记录成功与否都不会影响业务的成败)。有过编程经验的童鞋都知道,日志按照级别来划分从低到高,可以分为三种:debug、info和error。在这个模型中,我们创建一个topic exchange,然后分别以debug、info和error为主题分别绑定到三个队列。不同级别的日志消费者订阅不同的队列,然后记录到不同的日志文件(或者同一个文件使用不同的标识区分)中。

        整个消息的流通图如下:

 

        消息由生产者产生之后,通过一个topic交换机,根据不同的topic发送到响应的队列中,然后定义了3个消费者,每个消费者订阅了存放不同级别日志的通道,获取消息后进行相应的处理。

        我们决定采用spring boot集成RabbitMQ的方式实现,首先配置相关的exchange、binding和queue,如下代码:

@Configuration
public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}@BeanTopicExchange logTopicExchange() {return new TopicExchange("logTopicExchange", true, false);}@Beanpublic Queue debugQueue() {return new Queue("debugQueue", true, false, false);}@Beanpublic Queue infoQueue() {return new Queue("infoQueue", true, false, false);}@Beanpublic Queue errorQueue() {return new Queue("errorQueue", true, false, false);}@BeanBinding bindingDebugQueue() {return BindingBuilder.bind(debugQueue()).to(logTopicExchange()).with("debug");}@BeanBinding bindingInfoQueue() {return BindingBuilder.bind(infoQueue()).to(logTopicExchange()).with("info");}@BeanBinding bindingErrorQueue() {return BindingBuilder.bind(errorQueue()).to(logTopicExchange()).with("error");}
}

第1行:通过@Configuration注解开启配置支持

第3~10行:引入配置文件中的RabbitMQ的配置信息

第11~19行:创建链接工厂ConnectionFactory 实例。

第20~25行:创建RabbitTemplate实例,后面将用它来发送消息。

第26~29行:创建了名称为logTopicExchange的主题交换机

第30~41行:创建三个队列。

第42~53行:分别将队列绑定到交换机上。

        完成了上述生产者端的配置,接下来我们看下发送消息的代码:

public class LogServiceImpl implements LogService {private ExecutorService executorService = Executors.newFixedThreadPool(10);@AutowiredRabbitTemplate rabbitTemplate;@Overridepublic void sendMsg(String routeKey, String msg) {MessageProperties messageProperties = new MessageProperties();// 设置过期时间,单位:毫秒,30分钟messageProperties.setExpiration("1800000");messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("UTF-8");byte[] msgBytes = msg.getBytes();Message message = new Message(msgBytes, messageProperties);CompletableFuture.runAsync(() -> rabbitTemplate.convertAndSend("logTopicExchange",routeKey,message), executorService);}}

第10~16行:这是在通过传过来的消息来设置Message对象,可以看到,为了防止消息不能被及时读取而大量堆积,这里设置了消息的超时为半个小时。

第17~20行:我们选择了异步发送消息的方法,主要是考虑到业务日志的写入不应该影响业务的实现,而又不会关心日志写入的结果,所以这里采用了异步的方式。

        建立单元测试,发送消息之后可以看到,交换机和队列都已经创建,而且消息已经正确路由到了队列中。

生成的交换机:

 生成的队列:

         生产者一方的准备工作做好之后,我们看下消费者的处理。相比生产者,消费者的实现要简单的多,有关RabbitMQ的配置这里不再重复列举,只看下消费者的监听部分代码:

@Service
public class LogServiceImpl implements LogService {@Override@RabbitListener(queues = {"debugQueue"})public void writeDebug(Message message) {String str=new String(message.getBody());System.out.println(str);}@Override@RabbitListener(queues = {"infoQueue"})public void writeInfo(Message message) {String str=new String(message.getBody());System.out.println(str);}@Override@RabbitListener(queues = {"errorQueue"})public void writeError(Message message) {String str=new String(message.getBody());System.out.println(str);}
}

第1行,@Service必不可少,需要将监听的服务类托管到IOC中。

第4、11、18行,使用3个 @RabbitListener注解来监听debugQueue、infoQueue和errorQueue三个队列

        以上就是我们简单实现的一个发后即忘模型的案例。虽然简单,但是足以作为一个经典案例。而且有些细节需要注意,比如:在发送消息时要考虑异步发送,才不会对业务代码进行干扰。接下来我们开始用实例解释下RabbitMQ远程RPC调用的方式。

二、远程RPC调用模型

        所谓远程RPC方式调用模型,在上文中我们已经介绍过,简单理解就是发送信息后,生产者一直等待消费者返回消费后的结果。那么问题来了,消费者是怎么把消费的结果返回给生产者呢?毋庸置疑,消费者返回的肯定也是一个消息,那么这个消息要通过哪个交换机?到达哪个通道?下面我们就来一一解决这些问题。

        首先,看下远程RPC方式调用模型的示意:

 

        笔者来解释下整个过程:

(1)生产者向业务交换机里面发送业务命令或者事件,同时需要创建一个只有自己能够监听的而且是保证队列名称唯一的私密队列,然后开始监听这个队列。

(2)发送消息的消息头中具有一个叫做reply_to的字段,这个字段设置为上一步骤创建的队列名称。

(3)消费者获取到业务命令或者事件之后,开始执行业务。执行完成业务之后,将回复消息通过默认的交换机传递到reply_to队列里面。

(4)生产者接收到消费者回复的消息之后,完成业务,结束等待。

        下面,我们来看下生产者端的代码。ConnectionFactory等基本配置我这里不再展示,需要特别注意的是引入了一个新的Bean——simpleMessageListenerContainer,主要用来手动添加监听的队列以及监听器。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());return container;
}

        接下来是远程RPC调用的方法:

public void sendRPCMsg(String routeKey, String msg) {RabbitAdmin admin = new RabbitAdmin(connectionFactory);Queue replytoQueue = admin.declareQueue();MessageProperties messageProperties = new MessageProperties();messageProperties.setReplyTo(replytoQueue.getName());byte[] msgBytes = msg.getBytes();Message message = new Message(msgBytes, messageProperties);rabbitTemplate.convertAndSend("eventTopicExchange",routeKey,message);Thread currentThread = Thread.currentThread();simpleMessageListenerContainer.addQueues(replytoQueue);simpleMessageListenerContainer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {String str = new String(message.getBody());System.out.println(str);simpleMessageListenerContainer.removeQueues(replytoQueue);LockSupport.unpark(currentThread);}});LockSupport.park();
}

第1~2行:创建一个RabbitAdmin对象,这个对象可以手动创建交换机、队列等等。

第4~11行:将队列名称放到消息的reply_to头部,并且进行消息的发送。

第13~14行:使用simpleMessageListenerContainer监听新创建的队列,并且设置监听对象。

第24行:保持线程阻塞,然后在第21行解除阻塞状态。

        我们在第2行创建了一个队列,我们看下declareQueue方法的定义:

public Queue declareQueue() {try {DeclareOk declareOk = this.rabbitTemplate.execute(Channel::queueDeclare);return new Queue(declareOk.getQueue(), false, true, true); // NOSONAR never null}catch (AmqpException e) {logOrRethrowDeclarationException(null, "queue", e);return null;}
}

     注意上述代码的第4行,在这里实际上创建了一个随机名称的队列,RabbitMQ会保证队列名称的唯一,而创建的Queue对象的后面三个boolean类型的参数指明了队列是不可持久化的、排他的、以及自动删除,也就是说创建的队列只能当前的channel自己监听,而且一旦队列里面没有消息或者channel关闭队列就会消失。就是这些属性,保证了创建了一个临时性的队列,而且其他消费者无法进行监听。

        最后,我们再看下消费者的处理逻辑:

@RabbitListener(queues = {"eventQueue"})
public void getMsg(Message message) {String str = new String(message.getBody());System.out.println(str);String replayTo = message.getMessageProperties().getReplyTo();System.out.println("replayTo =" + replayTo);byte[] msgBytes = "我收到了".getBytes();MessageProperties messageProperties = new MessageProperties();Message replayMessage = new Message(msgBytes, messageProperties);try {rabbitTemplate.send(replayTo, replayMessage);} catch (AmqpException e) {e.printStackTrace();}
}

第1行:使用RabbitListener监听名称为eventQueue的队列。

第5行:从接受到的消息中获取replay_to的队列名称。

第11行:向生产者回复消息

        我们看到,相比生产者,消费者代码要简单的多,就是多了一个获取replay_to队列并发送消息的过程。下面看下replay_to队列的庐山真面目,如下图红色圈出部分:

 

三、总结

        本文主要介绍了RabbitMQ发后即忘和远程RPC调用两种数据发送模型,现总结如下:

(1)发后即忘数据发送模型针对发送的信息生产者不关心对方的处理结果这一业务前提实现,实现起来比较简单,但是需要注意发送消息时应该采用异步发送,避免消息的发送影响业务。

(2)如果需要等待消费者的返回结果,应该采用远程RPC调用数据发送模型。生产者自己创建接受回复消息的队列,而且应该保证队列名称唯一、队列私有和支持自动删除,通过消息的reply_to头部将队列名称发送给消费者,消费者再通过RabbitMQ的默认交换机向reply_to队列回复消息。


http://www.ppmy.cn/news/66614.html

相关文章

Kali-linux应用更新和配置额外安全工具

本节将介绍更新Kali的过程和配置一些额外的工具。这些工具在后面的章节中将是有用的。Kali软件包不断地更新和发布之间,用户很快发现一套新的工具比最初在DVD ROM上下载的软件包更有用。本节将通过更新安装的方法,获取Nessus的一个激活码。最后安装Squid…

KingbaseES 复制冲突之锁类型冲突

背景 昨天遇到客户现场的一个有关复制冲突的问题 备库报错:ERROR: canceling statement due to conflict with recovery,user was holding a relation lock for too long 现场情景是备库执行逻辑备份过程中出现的报错,逻辑备份相当于备库查询…

yolov5 用自己的数据集进行训练

在训练之前先要按照一定目录格式准备数据: VOC标签格式转yolo格式并划分训练集和测试集_爱钓鱼的歪猴的博客-CSDN博客 目录 1、修改数据配置文件 2、修改模型配置文件 3、训练 1、修改数据配置文件 coco.yaml 拷贝data/scripts/coco.yaml文件, pa…

k8s二进制搭建|ETCD + Flannel | 单节点部署 | 多节点的部署|dashbord的部署

k8s二进制搭建|ETCD Flannel | 单节点部署 | 多节点的部署|dashbord的部署 二进制搭建 Kubernetes v1.201 初始化环境2 部署 docker引擎3 在mster 192.168.10.10上操作4 在 node01 192.168.10.20节点上操作5 在 node02 192.168.10.30节点上操作6 检查etcd群集状态7 部署 Maste…

Java设计模式(二十二)策略模式

一、概述 策略模式是一种行为型设计模式,它允许在运行时选择算法的行为。策略模式通过将算法封装成独立的策略类,使得它们可以相互替换,而不影响使用算法的客户端。这样可以使客户端代码与具体算法的实现细节解耦,提高了代码的可…

03FPGA—led灯的显示(入门)

学习fpga也有段时间了,但后台有几个朋友问我能不能分享一点简单入门例子,于是我打算发经典的如何控制led的例子,本文主要分享设计流程以及简单的verilog语法。 设计流程主要包括五个步骤模块设计、波形设计、编写rtl代码、仿真验证、上板验证&#xff0c…

r语言tidyverse教程:4 dplyr

文章目录 简介和数据准备行列筛选mutate数据概述集合运算 R语言系列: 编程基础💎循环语句💎向量、矩阵和数组💎列表、数据帧排序函数💎apply系列函数tidyverse:readr💎tibble💎tidy…

MySQL的事务

1、事务的概念 事务是一种机制、一个操作序列,包含了一组数据库操作命令,并且把所有的命令作为一个整体一起向系统提交或撤销操作请求,即这一组数据库命令要么都执行,要么都不执行。 事务是一个不可分割的工作逻辑单元&#xff…