Rabbitmq消息确认机制

news/2024/11/16 15:46:27/

1.生产者确认机制

确认消息发送到交换机--Confirm方式

1.1普通Confirm方式

private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        //开启确认机制
        channel.confirmSelect();
        //发送消息到exchange
        String msg = "hello confirm";
 
        channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
 
        if (channel.waitForConfirms()) {
            System.out.println("生产者发布消息至Exchange成功!");
        } else {
            System.out.println("生产者发布消息至Exchange失败!请重试");
        }
    }
 

1.2异步Confirm方式

 private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        //开启确认机制
        channel.confirmSelect();
        //发送消息到exchange
        String msg = "hello confirm";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", "no-lost", MessageProperties.PERSISTENT_TEXT_PLAIN, (msg + i).getBytes());
        }
 
        //开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("生产者发布消息至Exchange成功,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
            }
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("生产者发布消息至Exchange失败,标示为:" + deliveryTag + ",是否为批量操作:" + multiple);
            }
        });
        System.in.read();
    }

2.交换机到消息队列的return确认机制

 //3开启Return机制
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //当送达失败是才会回调
                System.out.println(new String(body, "utf-8") + ",消息没有送达到queue中");
            }
        });
 

3.消费者确认机制

接收消息成功时

channel.basicAck(envelope.getDeliveryTag(), false);

接收消息失败或进入异常时

 try {
                    //具体业务
                    int i = 1 / 0;
                    //确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    if (errorMap.get(new String(body, "UTF-8")) != null) {
                        System.out.println("消息已重复处理失败,拒绝再次接收...");
                        channel.basicReject(envelope.getDeliveryTag(), false);
                    } else {
                        System.out.println("消息即将再次返回队列处理...");
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                        errorMap.put(new String(body, "UTF-8"), 1);
                    }
                }
            }
 

一般在该消息处理完后执行,该消息才会在队列里面被删除,不然会处于UnAcked的状态存在队列中

finally {channel.basicAck(envelope.getDeliveryTag(), false);
}

http://www.ppmy.cn/news/44117.html

相关文章

Microsoft AADP部署方案

目录 前言 一、方案概述? 二、企业级Azure Subscription 三、Azure Active Directory Premium立即获取

用gdb调试ROS程序

文章目录 在debug模式编译添加GDB调试指令 (launch)添加GDB调试指令 (cmd)参考在debug模式编译 用命令行catkin_make,在输入catkin_make时加上一个参数: catkin_make -DCMAKE_BUILD_TYPE=Debug或者直接修改CMakelist.txt,添加以下代码: SET(CMAKE_BUILD_TYPE "D…

【Spring Cloud服务与服务之间的调用:Feign、RestTemplate】

在Spring Cloud中&#xff0c;服务之间的调用通常使用RESTful API进行&#xff0c;即通过HTTP请求进行通信。具体来说&#xff0c;服务提供方将API暴露在特定的URL上&#xff0c;服务消费方通过HTTP客户端调用该URL&#xff0c;从而实现服务之间的交互。 在Spring Cloud中&…

sql查询语句-01

1.单表查询 ◆限制显示结果 使用limit限制显示的行数&#xff0c;分页函数limit m,n,从m1行开始显示n条记录 例&#xff1a;查询选修课程成绩排在前5的学生的学号和成绩。 select sno,score from SCorder by score desc limit 5;limit 1,3 零是第一条 ◆汇总数据(聚集函数&…

运维——记一次接口超时的问题与解决方法(HttpException: Read timed out)

前言&#xff1a;近期,一个线上的项目,请求出现了大量接口超时的问题,找了几个小时原因,最终发现是因为数据库服务器的磁盘满了,在此记录一下寻找的过程以及发现的问题,以备后续参考。 环境&#xff1a; 项目服务器(CentOS 64-bit 7.9) OpenJDK 1.8.0_272 数据库服务器(CentO…

使用kubeadm方式搭建K8S集群

kubeadm是官方社区推出的一个用于快速部署kubernetes集群的工具。 这个工具能通过两条指令完成一个kubernetes集群的部署: # 创建一个 Master 节点 kubeadm init# 将一个 Node 节点加入到当前集群中 kubeadm join <Master节点的IP和端口 >Kubeadm方式搭建K8S集群 使用…

Android之AppWidget 开发浅析

什么是AppWidget AppWidget 即桌面小部件&#xff0c;也叫桌面控件&#xff0c;就是能直接显示在Android系统桌面上的小程序&#xff0c;先看图&#xff1a; 图中我用黄色箭头指示的即为AppWidget&#xff0c;一些用户使用比较频繁的程序&#xff0c;可以做成AppWidget&#x…

【C++】STL——用一颗红黑树封装出map和set

用一颗红黑树封装出map和set 文章目录用一颗红黑树封装出map和set一、前言二、红黑树模板参数的控制三、模板参数中仿函数的增加四、红黑树正向迭代器的实现五、红黑树的反向迭代器的实现六、红黑树的begin()和end()七、红黑树的rbegin()和rend()八、[ ]下标访问运算符重载九、…