RabbitMQ-核心特性

embedded/2024/10/19 19:38:38/

已经不需要为RabbitMQ交换机的离去而感到伤心了,接下来登场的是RabbitMQ-核心特性!!!

文章目录

  • 核心特性
    • 消息过期机制
    • 消息确认机制
    • 死信队列

核心特性

消息过期机制

官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了
适用场景:清理过期数据

1)给队列中的所有消息指定过期时间

java">Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);

如果在过期时间内,还没有消费者取消息,消息才会过期
注意,如果消息已经接收到,但是没确认,是不会过期的
消费者中给队列中所有消息设置过期时间:

java">public class TtlConsumer {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建队列,指定消息过期参数Map<String, Object> args = new HashMap<String, Object>();args.put("x-message-ttl", 5000);// args 指定参数channel.queueDeclare(QUEUE_NAME, false, false, false, args);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};// 消费消息,会持续阻塞channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}
}

2)给某条消息指定过期时间

java">// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

生产者给某条消息指定过期时间

java">public class TtlProducer {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");
//        factory.setUsername();
//        factory.setPassword();
//        factory.setPort();// 建立连接、创建频道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 发送消息String message = "Hello World!";// 给消息指定过期时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消息确认机制

官方文档:https://www.rabbitmq.com/confirms.html
为保证消息成功被消费,rabbitmq提供了消息确认机制,当消费者收到消息后要给一个成功反馈:
●ack:消费成功
●nack:消费失败
●reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
image.png

java">        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {});

一般情况,建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:

image.png

java">channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

第二个参数 multiple 批量确认:是指是否要一次性确认所有的历史消息直到当前这条消息
指定拒绝某条消息:
image.png

java">channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

第 3 个参数表示是否重新入队,可用于重试

死信队列

官方文档:https://www.rabbitmq.com/dlx.html
为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?
死信:过期的消息,拒收的消息,消息队列满了,处理失败的消息的统称
死信队列:专门处理死信的队列
死信交换机:专门给死信队列转发消息的交换机
示例场景:
image.png
实现:
1)创建死信交换机和死信队列,并且绑定关系
2)给失败之后需要容错处理的队列绑定死信交换机
3)可以给要容错的队列指定死信之后的转发规则,死信应该再转发到哪个死信队列
4)可以通过程序来读取死信队列中的消息,从而进行处理
生产者代码:

java">package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.Scanner;public class DlxDirectProducer {//死信交换机private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";//工作交换机private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");// 创建laoban死信队列String queueName = "laoban_dlx_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");//创建waibao死信队列String queueName2 = "waibao_dlx_queue";channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [laoban] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [waibao] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {});Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");}}}
}

消费者代码:

java">在这里插入代码片package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class DlxDirectConsumer {private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");//小狗的死信要转发到waibao这个死信队列// 指定死信队列参数Map<String, Object> args = new HashMap<>();// 要绑定到哪个交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);// 指定死信要转发到哪个死信队列args.put("x-dead-letter-routing-key", "waibao");// 创建队列,随机分配一个队列名称String queueName = "xiaodog_queue";channel.queueDeclare(queueName, true, false, false, args);channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");//小猫的死信要转发到laoban这个死信队列Map<String, Object> args2 = new HashMap<>();args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);args2.put("x-dead-letter-routing-key", "laoban");// 创建队列,随机分配一个队列名称String queueName2 = "xiaocat_queue";channel.queueDeclare(queueName2, true, false, false, args2);channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaodog] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaocat] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {});}
}

http://www.ppmy.cn/embedded/6061.html

相关文章

java的总结

由于最近已经开始做项目了&#xff0c;所以对java的基础知识的学习都是一个离散化的状态没有一个很系统的学习&#xff0c;都是哪里不会就去学哪里。 先来讲一下前后端的区别吧 在我的理解前端就是&#xff1a;客户端在前端进行点击输入数据&#xff0c;前端将这些数据整合起来…

系统安全及应用

目录 1.账号安全控制 1系统账号清理 2密码安全控制 1 对已经存在的用户账号进行控制 2 对新建的用户密码默认设置 3 历史命令和终端自动注销的安全管理 1 历史命令的限制 2. 用户切换管理 1 su命令的使用 2 ssh 3.授权用户管理 1 sudo命令 2 sudo用户别名 3 查看su…

Hbase的shell命令(详细)

一、help 1.help 显示命名的分组情况 2.help 命令名称 查看命令的具体使用&#xff0c;包括命令的作用和用法。 举例&#xff1a;help list 二、general 组&#xff08;普通命令组&#xff09; 命令 描述 …

欢迎大家光临成都市

我现在就在家里&#xff0c;刚刚理个发&#xff0c;洗个澡 爸妈也在家里&#xff0c;一切正常&#xff0c;但是QQ上不了&#xff0c;哎呀,又长胖了&#xff0c;不好意思

51单片机入门_江协科技_31~32_OB记录的自学笔记_LCD1602液晶显示屏

31. LCD1602 31.1. LCD1602介绍 •LCD1602&#xff08;Liquid Crystal Display&#xff09;液晶显示屏是一种字符型液晶显示模块&#xff0c;可以显示ASCII码的标准字符和其它的一些内置特殊字符&#xff0c;还可以有8个自定义字符 •显示容量&#xff1a;162个字符&#xff0c…

图论学习总结

目录 图论学习总结前言一、基础知识图的存储图的遍历 二、最短路多源最短路 F l o y d Floyd Floyd​ 算法例题及变形 e g 1 &#xff1a; S o r t i n g I t A l l O u t eg1&#xff1a;Sorting\ It\ All\ Out eg1&#xff1a;Sorting It All Out ( 蓝书例题&#xff0c;传递…

zabbix自定义监控、自动发现和注册以及代理设置

前言 监控项的定制和新设备的注册往往需要大量手动操作&#xff0c;这会导致维护成本的增加和监控效率的降低。本文将介绍如何利用 Zabbix 的自定义功能&#xff0c;实现监控项的动态发布和新设备的自动注册以及代理设置、从而简化运维工作并实现更高效的监控管理。 Zabbix 监…

CSS3 新特性

文章目录 选择器圆角效果 - border-radius阴影效果 - box-shadow渐变效果 - linear-gradient()形变效果 - transform过渡效果 - transition动画效果 - animation媒体查询 - media弹性盒子布局 - flex网格布局 - grid背景效果1. 多背景图2. 背景裁剪3. 透明效果 CSS3中引入了许多…