RabbitMQ 消费者

news/2024/11/18 3:02:07/

  RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。
  消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制

消费者

  • 拉模式
    • 拉模式的实现
  • 推模式
  • 消费确认与拒绝
    • 消息确认的实现
    • 消息拒绝的实现
    • basicRecover
  • basicQos 限制消费
  • 总结

拉模式

  顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。
在这里插入图片描述
  RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者,消费者接收处理消息后向RabbitMQ发送消费应答,然后该消息将从队列中移除。
  需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景,如果以循环的方式获取批量数据将影响RabbitMQ的性能。

拉模式的实现

  拉模式通过以下方法实现:

/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)

  如上述代码所示channel.basicGet方法返回的是一个GetResponse,在GetResponse对象中包含了一条消息内容,消费者可以获取该消息并进行处理。

推模式

  推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费,只要队列中存在消息则向消费者推送,当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。

    /*** Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}* method.* Provide access to <code>basic.deliver(Broker推送消息)</code>, <code>basic.cancel</code>* and shutdown signal callbacks (which is sufficient* for most cases). See methods with a {@link Consumer} argument* to have access to all the application callbacks.* @param queue 队列名称* @param autoAck 是否自动确认* @param consumerTag 消费者标签,消费者的唯一标识符* @param noLocal 是否可以接收同Connection中生产者的消息(true不能接收)* @param exclusive 是否设置排他* @param arguments 其他参数* @param deliverCallback 消息接收回调* @param cancelCallback 消费取消回调* @param shutdownSignalCallback 连接或者信道关闭回调* @return the consumerTag associated with the new consumer*/String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  可以通过上述两种方法(设置参数最多的)实现声明消费者。其中Consumer的定义如下:


public interface Consumer {/*** 消费者通过basicConsume被注册后调用*/void handleConsumeOk(String consumerTag);/*** 消费者通过basicCancel取消时调用*/void handleCancelOk(String consumerTag);/*** 消费者不通过basicCancel取消时调用*/void handleCancel(String consumerTag) throws IOException;/*** 通道或者连接关闭时调用*/void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);/*** 接收重新发送的未被确认的消息时调用*/void handleRecoverOk(String consumerTag);/*** 接收消息时调用* @param consumerTag 消费者标签* @param envelope 打包消息的数据* @param properties 消息的内容标头数据* @param body 消息内容*/void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException;
}

消费确认与拒绝

  为了保障消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认,然后从队列中删除;当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息(先标记再删除)。
  autoAck参数意为自动应答,但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认,无需消费者进行应答。

  当autoAck参数为false时,对于RabbitMQ服务器而言,队列中的消息分成两部分:一部分时等待投递给消费者的消息;一部分时已经投递给消费者,但是还未收到消费者确认消息的消息
  RabbitMQ不会为未确认的消息设置过期时间,如果一个消息一直未被消费者确认,那么这个消息再RabbitMQ中将一直保存为投递未确认状态,指导消费者确认或者消费者断开连接,如果消费者断开连接,则该消费者接收但未确认的消息将重新入队。

消息确认的实现

  消息的显式确认需要消费者再声明的过程中设置autoAck=false。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下:

	 /*** @param 消息的标签,可通过Delivery.getEnvelope().getDeliveryTag()获取* @param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答,如果为false则仅应答一条消息*/void basicAck(long deliveryTag, boolean multiple) throws IOException;

  当进行消息的批量确认时,将所有发送给该消费者未确认的消息进行确认,而针对监听同一队列的其他消费者的未确认消息并不进行处理。

消息拒绝的实现

  RabbitMQ提供了两种消息拒绝的方法:Basic.Reject和Basic.Nack命令;其两者的区别时Nack可以进行批量拒绝。

    /*** @param deliveryTag 消息标签* @param requeue 为true时被拒绝的消息重新入队,否则将成为死信* @throws java.io.IOException if an error is encountered*/void basicReject(long deliveryTag, boolean requeue) throws IOException;/*** @param deliveryTag 消息标签* @param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息* @param requeue 为true时被拒绝的消息重新入队,否则将成为死信* @throws java.io.IOException if an error is encountered*/void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;

basicRecover

该方法可以将某个消费者未应答(确认或者拒绝)的消息重新入队,该方法会导致:

  • 投递而未被应答的消息可以重新发送给消费者进行处理
  • 消费者的消息队列被清空,可以重新接收到其他消息
    /*** <p>*  Ask the broker to resend unacknowledged messages.  In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* </p>* Equivalent to calling <code>basicRecover(true)</code>, messages* will be requeued and possibly delivered to a different consumer.* @see #basicRecover(boolean)*/Basic.RecoverOk basicRecover() throws IOException;/*** Ask the broker to resend unacknowledged messages.  In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* @param requeue If true, messages will be requeued and possibly* delivered to a different consumer. If false, messages will be* redelivered to the same consumer.*/Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

basicQos 限制消费

  默认情况下,消费者对于接收的消息数量并未限制,也就是说,一旦RabbitMQ中接收到消息并且存在消费者,则RabbitMQ将把消息发送到相关的消费者中,并不关心消费者是否消息完信息。
  轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下:

    /*** @param prefetchSize 消息大小* @param prefetchCount 消息数量* @param global 是否全局* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

针对global参数需要注意一下内容:

  • 当global=true时信道上所有的消费者都需要遵从消息数量限定值(某个信道上所有消费者未确认消息数量<=prefetchCount)
  • 等global=false时新的消费者需要遵从消息数量的限定值。
  • 可以调用两次basicQos方法,并使用不同的global参数,这种情况下两次配置都可以生效。

总结

  消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式,推模式的是使用场景相对比较多。
  为确保消息被合法的消费,RabbitMQ提供了消费确认机制,投递的消息并不能被理解完成了消费,仅消费者确认消费该消息才会被移除队列。
  默认的消息投递机制时轮询,轮询的消息分发并会关系消费者的性能以及消息积压的问题,因此需要限制每个消费者所能保持的最大未确认的消息数量。


http://www.ppmy.cn/news/1055630.html

相关文章

ASE 基础知识

笔记&#xff1a; 1&#xff0c;颜色&#xff0c;贴图 2&#xff0c;加减乘除 3&#xff0c;菲涅尔、sin&#xff0c;float、vector、time 4&#xff0c;漫反射&#xff0c;法线&#xff0c;自发光&#xff0c;金属度&#xff0c;反射&#xff0c;AO明暗&#xff0c;折射&#…

高等数学:线性代数-第三章

文章目录 第3章 矩阵的初等变换与线性方程组3.1 矩阵的初等变换3.2 矩阵的秩3.3 方程组的解 第3章 矩阵的初等变换与线性方程组 3.1 矩阵的初等变换 矩阵的初等变换 下面三种变换称为矩阵的初等变换 对换两行&#xff08;列&#xff09;&#xff0c;记作 r i ↔ r j ( c i …

如何使用PyQt进行网络编程?

PyQt是一个用Python编写的图形用户界面库&#xff0c;它能够让你创建具有图形用户界面的应用程序&#xff0c;比如你可以用它来创建漂亮的窗口、按钮、文本框等等。而网络编程则是让你的应用程序能够与远程服务器进行通信&#xff0c;比如获取网页内容、发送和接收电子邮件、访…

C#的IndexOf

在 C# 中&#xff0c;IndexOf 是一个字符串、数组或列表的方法&#xff0c;用于查找指定元素的第一个匹配项的索引。它返回一个整数值&#xff0c;表示匹配项在集合中的位置&#xff0c;如果未找到匹配项&#xff0c;则返回 -1。 IndexOf 方法有多个重载形式&#xff0c;可以根…

Linux知识点 -- Linux多线程(三)

Linux知识点 – Linux多线程&#xff08;三&#xff09; 文章目录 Linux知识点 -- Linux多线程&#xff08;三&#xff09;一、线程同步1.概念理解2.条件变量3.使用条件变量进行线程同步 二、生产者消费者模型1.概念2.基于BlockingQueue的生产者消费者模型3.单生产者单消费者模…

蓝帽杯半决赛2022

手机取证_1 iPhone手机的iBoot固件版本号:&#xff08;答案参考格式&#xff1a;iBoot-1.1.1&#xff09; 直接通过盘古石取证 打开 取证大师和火眼不知道为什么都无法提取这个 手机取证_2 该手机制作完备份UTC8的时间&#xff08;非提取时间&#xff09;:&#xff08;答案…

优化生产流程:数字化工厂中的OPC UA分布式IO模块应用

背景 近年来&#xff0c;为了提升在全球范围内的竞争力&#xff0c;制造企业希望自己工厂的机器之间协同性更强&#xff0c;自动化设备采集到的数据能够发挥更大的价值&#xff0c;越来越多的传统型工业制造企业开始加入数字化工厂建设的行列&#xff0c;实现智能制造。 数字化…

用 Audacity 比较两段音频差异

工作中遇到相同的处理流程&#xff0c;处理同一段音频&#xff0c;看看处理结果是否一致&#xff0c;可以用audacity来处理。 假设待比较的音频分别为 1.wav 2.wav 1、用Audacity打开1.wav 2、用Audacity打开2.wav&#xff0c;选中音频&#xff0c;然后用 效果 -> 反向&am…