spring使用rabbitmq当rabbitmq集群节点挂掉 spring rabbitmq怎么保证高可用,rabbitmq网络怎么重新连接

ops/2024/12/19 18:26:42/

##spring rabbitmq代码示例

Controller代码

import com.alibaba.fastjson.JSONObject;
import com.newland.mi.config.RabbitDMMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import java.io.UnsupportedEncodingException;
import java.util.UUID;@Controller
@RequestMapping("/rabbitmq")
public class RabbitmqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/getMessage")@ResponseBodypublic String getMessage() throws UnsupportedEncodingException {JSONObject obj = new JSONObject();obj.put("yym", "yym");MessageProperties messageProperties = new MessageProperties();String msgId = UUID.randomUUID().toString();rabbitTemplate.send(RabbitDMMQConfig.YYM_EXCHANGE, RabbitDMMQConfig.YYM_ROUTINGKEY, new Message(obj.toString().getBytes("UTF-8"), messageProperties), new CorrelationData(msgId));return "success";}}

RabbitmqConfig代码

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitDMMQConfig {protected final static Logger log = LoggerFactory.getLogger(RabbitDMMQConfig.class);/*** Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。* Queue:消息的载体,每个消息都会被投到一个或多个队列。* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。* Producer:消息生产者,就是投递消息的程序.* Consumer:消息消费者,就是接受消息的程序.* Channel:消息通道,在客户端的每个连接里,可建立多个channel.*/public static final String YYM_EXCHANGE = "yym-exchange";public static final String YYM_QUEUE = "yym-queue";public static final String YYM_ROUTINGKEY = "yym-routingKey";/*** 死信队列:*/public final static String deadQueueName = "ad_dead_queue";public final static String deadRoutingKey = "ad_dead_routing_key";public final static String deadExchangeName = "ad_dead_exchange";@Beanpublic Queue deadQueue() {Queue queue = new Queue(deadQueueName, true);return queue;}@Beanpublic DirectExchange deadExchange() {return new DirectExchange(deadExchangeName);}@Beanpublic Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);}/*** 死信队列 交换机标识符*/public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";/*** 死信队列交换机绑定键标识符*/public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";@Beanpublic ConnectionFactory yymConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.3.162:5672,192.168.3.162:5673,192.168.3.162:5674");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setConnectionTimeout(15000);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);log.info("svc dm ConnectionFactory success");return connectionFactory;}/*** 必须是prototype类型* @return*/@Beanpublic RabbitTemplate yymRabbitTemplate() {RabbitTemplate yymRabbitTemplate = new RabbitTemplate(dmConnectionFactory());yymRabbitTemplate.setMandatory(true);yymRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息发送成功:correlationData({}),ack({}),cause({})", JSON.toJSONString(correlationData), ack, cause);}});yymRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,replyCode, replyText, new String(message.getBody()));}});log.info("RabbitTemplate success");return dmRabbitTemplate;}/*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念HeadersExchange :通过添加属性key-value匹配DirectExchange:按照routingkey分发到指定队列TopicExchange:多关键字匹配*/@Beanpublic DirectExchange yymDefaultExchange() {return new DirectExchange(YYM_EXCHANGE);}/*** 获取队列A* @return*/@Beanpublic Queue queueA() {// 队列持久return new Queue(YYM_QUEUE, true);}@Beanpublic Binding binding() {return BindingBuilder.bind(queueA()).to(dmDefaultExchange()).with(YYM_ROUTINGKEY);}}

##docker rabbitmq 集群

yym@yym:~$ docker ps -a
CONTAINER ID   IMAGE                                          COMMAND                  CREATED        STATUS                     PORTSNAMES
d95cd024f3c9   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 6 hours                 4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp, [::]:5674->5672/tcp, 0.0.0.0:15674->15672/tcp, [::]:15674->15672/tcp   rabbitmq-node3
fd35f01e8b2d   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 26 seconds              4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, [::]:5673->5672/tcp, 0.0.0.0:15673->15672/tcp, [::]:15673->15672/tcp   rabbitmq-node2
83aa5e48fb3b   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 21 minutes              4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp       rabbitmq-node1

##关闭其中一个节点

docker stop 83aa5e48fb3b

##CachingConnectionFactory监听关闭事件ShutdownListener.shutdownCompleted

@Overridepublic void shutdownCompleted(ShutdownSignalException cause) {this.closeExceptionLogger.log(logger, "Channel shutdown", cause);int protocolClassId = cause.getReason().protocolClassId();if (protocolClassId == RabbitUtils.CHANNEL_PROTOCOL_CLASS_ID_20) {getChannelListener().onShutDown(cause);}else if (protocolClassId == RabbitUtils.CONNECTION_PROTOCOL_CLASS_ID_10) {getConnectionListener().onShutDown(cause);}}

##请求controller rabbitmq/getMessage

RabbitTemplate发起请求

##connection等空,新建一个connection

 ##使用CachingConnectionFactory缓存里面的this.connection.target

  ##使用CachingConnectionFactory上次连接缓存里面的this.connection

##findOpenChannel从channelList缓存数组中清理掉channel.isOpen()是关闭的

 ##判断连接是否打开的connection.isOpen()

 ##关闭事件监听ShutdownListener.shutdownCompleted中 this.shutdownCause已经有值,所以不等空,是否打开连接为假。isOpen函数返回假。

 

 ##connection.isOpen()连接未打开,channel为空,新建一个channel

 ##createBareChannel新建channel

 ##新建createConnection连接

##连接 

##根据配置地址进行连接

 ##创建新的连接

##新连接

##使用socket创建连接

##192.168.3.162:5672节点是关闭的,创建失败

 ##循环下一个节点192.168.3.162:5673连接,直到节点可以连接

##SocketFrameHandler socket 读写流

##初始化心跳

 ##this.connection.target创建完成

## 执行mq消息发送


http://www.ppmy.cn/ops/143251.html

相关文章

Mac/Linux 快速部署TiDB

1.下载TiUP TiDB 是一个分布式系统。最基础的 TiDB 测试集群通常由 2 个 TiDB 实例、3 个 TiKV 实例、3 个 PD 实例和可选的 TiFlash 实例构成。通过 TiUP Playground,可以快速搭建出上述的一套基础测试集群,步骤如下: curl --proto https -…

ASP.NET |日常开发中连接Mysql数据库增删改查详解

ASP.NET |日常开发中连接Mysql数据库增删改查详解 前言一、连接 MySQL 数据库1.1 安装和引用相关库1.2 建立数据库连接 二、数据库增删改查操作2.1 插入数据(Insert)2.2 查询数据(Select)2.3 更新数据(Upda…

如何利用Python爬虫获得1688商品详情

在这个信息爆炸的时代,数据就像是一块块美味的奶酪,而爬虫就是我们手中的瑞士军刀。今天,我要带你一起潜入1688这个巨大的奶酪洞穴,用Python爬虫捞起那些香气四溢的商品详情。别担心,我们的工具箱里有各种各样的工具&a…

1 JVM JDK JRE之间的区别以及使用字节码的好处

JDK jdk是编译java源文件成class文件的,我们使用javac命令把java源文件编译成class文件。 我们在java安装的目录下找到bin文件夹,如下图所示: 遵循着编译原理,把java源文件编译成JVM可识别的机器码。 其中还包括jar打包工具等。主要是针对…

关于Postgresql旧版本安装

抛出问题 局点项目现场,要求对如下三类资产做安全加固,需要在公司侧搭建测试验证环境,故有此篇。 bclinux 8.2 tomcat-8.5.59 postgrel -11 随着PG迭代,老旧版本仅提供有限维护。如果想安装老版本可能就要费劲儿一些。现在&…

leetcode:202. 快乐数(python3解法)

难度:简单 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1,也可能是 无限循环 但始终变不到 1。如果这个过程 …

rebase ‘A‘ onto ‘master‘ 和 merge ‘master‘ into ‘A‘有什么区别

在Git版本控制系统中,rebase 和 merge 是两种不同的操作,用于合并分支。rebase A onto master 和 merge master into A 虽然最终目的都是将两个分支的更改合并在一起,但它们在处理方式和结果上有所不同。 rebase ‘A’ onto ‘master’ 含义…

国标GB28181-2022平台EasyGBS:双网口的网络硬盘录像机怎么设置IP地址以及录像机怎么添加不同网段的摄像机?

在现代安防监控系统中,双网口的网络硬盘录像机(NVR)因其灵活性和高效性而备受青睐。这种设备不仅能够提供网络容错,确保网络的稳定性,还能通过多址设定模式连接不同网段的设备,极大地增强了监控系统的扩展性…