004 死信(限制队列最大长度)

embedded/2024/9/25 0:29:46/

文章目录

    • 消息ttl过期成为死信
    • 队列达到最大长度成为死信
      • MyOrder.java
      • RabbitMQDirectConfig.java
      • OrderProducer.java
      • PayConsumer.java
      • DeadOrderConsumer.java
    • application.yaml

死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

消息ttl过期成为死信

map.put("x-message-ttl",2000); // 消息存活时间1s

队列达到最大长度成为死信

MyOrder.java


package com.example.direct;import java.io.Serializable;public class MyOrder implements Serializable {private String orderId;private String orderNumber;private String customerName;private Integer productId;private String productName;private Float productPrice;private Integer productCount;private Float orderPrice;public MyOrder(){}public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {this.orderId = orderId;this.orderNumber = orderNumber;this.customerName = customerName;this.productId = productId;this.productName = productName;this.productPrice = productPrice;this.productCount = productCount;this.orderPrice = orderPrice;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getOrderNumber() {return orderNumber;}public Integer getProductId() {return productId;}public void setProductId(Integer productId) {this.productId = productId;}public void setOrderNumber(String orderNumber) {this.orderNumber = orderNumber;}public String getCustomerName() {return customerName;}public void setCustomerName(String customerName) {this.customerName = customerName;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public Float getProductPrice() {return productPrice;}public void setProductPrice(Float productPrice) {this.productPrice = productPrice;}public Integer getProductCount() {return productCount;}public void setProductCount(Integer productCount) {this.productCount = productCount;}public Float getOrderPrice() {return orderPrice;}public void setOrderPrice(Float orderPrice) {this.orderPrice = orderPrice;}@Overridepublic String toString() {return "MyOrder{" +"orderId=" + orderId +", orderNumber='" + orderNumber + '\'' +", customerName='" + customerName + '\'' +", productName='" + productName + '\'' +", productPrice=" + productPrice +", productCount=" + productCount +", orderPrice=" + orderPrice +'}';}
}

RabbitMQDirectConfig.java


package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQDirectConfig  {//    1. 创建交换机
//    @Bean
//    public DirectExchange newDirectExchange(){
//        return new DirectExchange("myDirectExchangeAAA",true,false);
//    }//2. 创建队列
//    @Bean
//    public Queue newQueueA(){
//        return new Queue("queueAAA",true);
//    }//3. 绑定队列到交换机中
//    @Bean
//    public Binding bindingA(){
//        return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
//    }//==================死信//1. 创建交换机@Beanpublic DirectExchange newExchange(){return new DirectExchange("normalExchange",true,false);}//2. 创建队列@Beanpublic Queue newQueue(){Map<String ,Object> map = new HashMap<>();//map.put("x-message-ttl",2000); // 消息存活时间1smap.put("x-max-length",6); // 队列达到最大长度 为6map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字return new Queue("normalQueueA",true,false,false,map);}//3. 绑定@Beanpublic Binding binding(){return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");}//4. 创建死信交换机@Beanpublic DirectExchange newDeadExchange(){return new DirectExchange("deadExchange",true,false);}//5. 创建死信队列@Beanpublic Queue newDeadQueue(){return new Queue("deadQueueA",true,false,false);}//6. 绑定@Beanpublic Binding bindingDead(){return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");}}

OrderProducer.java


package com.example.direct;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;@RestController
@RequestMapping("a")
public class OrderProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/submitOrder")public String submitOrder(){Map<String,Object> map = new HashMap<>();map.put("orderNumber","2222");//Stringmap.put("productId",1111);//Integerfor(int i=0;i<=130;i++){String orderId = UUID.randomUUID().toString().replace("-","");map.put("orderId",orderId);rabbitTemplate.convertAndSend("normalExchange", "key1", map);}return "生产者下单成功";}}

PayConsumer.java

package com.example.direct;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
public class PayConsumer {@RabbitHandler@RabbitListener(queues = "normalQueueA")public void process(Map map, Channel channel, Message message) throws IOException {try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("支付服务接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("支付服务接收到的orderId:" + orderId);System.out.println("支付服务接收到的productId:" + productId);System.out.println("支付服务接收到的orderNum:" + orderNum);//告诉broker,消息已经被确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

DeadOrderConsumer.java

package com.example.direct;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class DeadOrderConsumer {// 获得死信队列中的消息@RabbitHandler@RabbitListener(queues = "deadQueueA")public void process(Map map){System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);String orderId = (String)map.get("orderId");//StringInteger productId = (Integer)map.get("productId");//IntegerString orderNum = (String)map.get("orderNumber");//StringSystem.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);}
}

application.yaml


server:servlet:context-path: /app
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)listener:simple:acknowledge-mode: manual # 手动消息确认concurrency: 1 #消费者数量max-concurrency: 1  #消费者最大数量prefetch: 1  #消费者每次从队列中取几个消息

http://www.ppmy.cn/embedded/16529.html

相关文章

java项目:微信小程序基于SSM框架实现的购物系统小程序【源码+数据库+毕业论文+PPT】

一、项目简介 本项目是一套基于SSM框架实现的购物系统小程序 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能齐…

【React】memo在props不变的情况下不再重复渲染组件

前言 组件中状态&#xff08;State&#xff09;发生改变会导致该组件重新渲染&#xff0c;其中的子组件也会被重新渲染。如果子组件中并未使用该状态&#xff08;State&#xff09;&#xff0c;重复渲染会导致无效的性能损耗。 在阻止重新渲染这个需求的基础上&#xff0c;诞…

Three.js——基础材质、深度材质、法向材质、面材质、朗伯材质、Phong材质、着色器材质、直线和虚线、联合材质

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

[C++][算法基础]完全背包问题(动态规划)

有 &#x1d441; 种物品和一个容量是 &#x1d449; 的背包&#xff0c;每种物品都有无限件可用。 第 &#x1d456; 种物品的体积是 &#x1d463;&#x1d456;&#xff0c;价值是 &#x1d464;&#x1d456;。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不…

python_django农产品物流信息服务系统6m344

Python 中存在众多的 Web 开发框架&#xff1a;Flask、Django、Tornado、Webpy、Web2py、Bottle、Pyramid、Zope2 等。近几年较为流行的&#xff0c;大概也就是 Flask 和 Django 了 Flask 是一个轻量级的 Web 框架&#xff0c;使用 Python 语言编写&#xff0c;较其他同类型框…

Spring 事务 @Transactional 注解

上期我们讲解了Spring事务的两种实现&#xff0c;其中声明式注解使用了 Transactional 注解&#xff0c; 接下来我们学习 该注解的使用细节。 我们主要学习 Transactional注解当中的三个常见属性&#xff1a; rollbackFor&#xff1a;异常回滚属性&#xff0c;指定能够出发事…

STCAD转换 晶联讯1353(5VLCD)显示

/***晶联讯1353(5VLCD)显示调节电位器参数变化***/ /******2018 6 30 08:50*L252 CODE 1339 ******/ /***变频器 PWM2017 5 6板测试AD晶联讯1353*****/ #include <reg52.h> // #define uint unsigned int …

前端项目中使用插件prettier/jscodeshift/json-stringify-pretty-compact格式化代码或json数据

同学们可以私信我加入学习群&#xff01; 正文开始 前言一、json代码格式化-选型二、json-stringify-pretty-compact简单试用三、prettier在前端使用四、查看prettier支持的语言和插件五、使用prettier格式化vue代码最终效果如图&#xff1a; ![在这里插入图片描述](https://im…