SSE多服务器部署导致消息推送异常问题的处理

server/2024/9/22 6:29:05/

之前讲了SSE的基本使用,后来,在项目实际部署的时候出现了新的问题。今天通过这篇文章来基于RabbitMQ解决一下当SSE服务部署到多台服务器后,存在消息推送异常的问题。

问题描述

SSE作为单向消息推送的一种方式,其背后是一种基于HTTP请求的长连接。而当这个连接建立之后,客户端是与服务器端的某一台服务器是存在关系绑定的。如果我们将同一套代码、同一份配置文件部署到多台服务器上的时候,就可能会出现连接建立在客户端与服务端A上,而当新的需要推送的消息由服务端B或其他服务端处理并发起推送的时候,其发现自己没有建立与客户端的SSE连接,就导致了消息推送失败的问题。针对这个问题,本文给出了一个解决方案。

问题分析

  1. 由于SSE连接是客户端与某一台服务器之间是强绑定的关系,所以我们需要让持有SSE连接的服务器100%能够接收到推送消息。
  2. 由于服务端存在多台部署的情况,所以我们需要通过RabbitMQ的发布订阅(fanout)模式将一条消息同时推送给所有服务端。
  3. 由于我们将同一套代码、同一份配置文件部署到多台服务器上,且RabbitMQ的发布订阅模式需要不同名的队列(Queue)绑定到同一个交换器(Exchange)上才能实现,因此多个服务端的队列名需要动态生成。
  4. @RabbitListener 注解的 queues 参数值要求必须为硬编码的字符串或 static final 修饰的变量,所以我们为其赋值的时候不可使用字符串拼接的形式,只能通过 SpEL 表达式赋值。

代码实现

RabbitMQ 广播模式配置类

import com.xxx.core.redis.RedisTemplateUtils;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ 广播模式配置类*/
@Configuration
public class RabbitFanoutConfig {private static Long innerCounter;public static final String FANOUT_QUEUE_COUNTER_KEY = "notice.fanout.queue.counter";// 系统消息交换器public static final String FANOUT_EXCHANGE_NAME = "notice_center_fanout_exchange";// 消息消息 消息队列public static final String NOTICE_CENTER_FANOUT_QUEUE = "notice_center_fanout_";@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE_NAME);}@BeanQueue noticeCenterfanoutQueue() {// 创建非持久化且自动删除的队列,解决因服务重启导致的RabbitMQ中无用队列过多问题return QueueBuilder.nonDurable(NOTICE_CENTER_FANOUT_QUEUE + getQueueCounter()).autoDelete().build();}@BeanBinding noticeCenter4CrowdBinding(FanoutExchange fanoutExchange, Queue noticeCenterfanoutQueue) {return BindingBuilder.bind(noticeCenterfanoutQueue).to(fanoutExchange);}public static long getQueueCounter() {if (null == innerCounter) {innerCounter = RedisTemplateUtils.incrementCounter(FANOUT_QUEUE_COUNTER_KEY);}return innerCounter;}
}

消息队列编号获取函数

public class RedisTemplateUtils {protected static final RedisTemplate redisTemplate = SpringUtils.getBean("redisTemplate");/*** 原子操作增加** @param key Redis键** @return {@link long} 最新值*/public static Long incrementCounter(String key) {// 使用 RedisTemplate 的操作方法来实现原子性的递增return redisTemplate.opsForValue().increment(key);}
}

注意:我这里用了 Redis 的原子自增,其实可以用任意可以取到不重复值的方式。

消费者监听

/*** 系统消息群发-广播模式*/
@RabbitListener(queues = "#{T(String).format('notice_center_fanout_%d', T(com.xxx.config.RabbitFanoutConfig).getQueueCounter())}",ackMode = "MANUAL")
public void noticeCenterFanoutMessageListener(@Payload String dataMsg, Message receivedMessage, Channel channel)throws IOException {long deliveryTag = receivedMessage.getMessageProperties().getDeliveryTag();try {// 处理消息log.info("消费者消息,noticeCenterFanoutMessageListener:deliveryTag:{} dataMsg:{} ", deliveryTag, dataMsg);        System.out.println(dataMsg);// 确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("MQConsumer.noticeCenterFanoutMessageListener,deliveryTag={},dataMsg={},error={}", deliveryTag,dataMsg, e.getMessage());// deliveryTag:表示要拒绝的消息的交付标签。// requeue:布尔值,指示是否将消息重新排队。如果设置为 true,RabbitMQ会尝试将消息重新排队,以便稍后再次发送给其他消费者;// 如果设置为 false,则消息将被丢弃。channel.basicReject(deliveryTag, true);}
}

广播发送

/*** RabbitMQ*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RabbitMQUtil {private static final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class);/*** 广播模式发送消息* @param fanoutExchangeName 广播模式交换器名称* @param message 消息内容*/public static void sendFanoutMessage(String fanoutExchangeName,String message) {rabbitTemplate.convertAndSend(fanoutExchangeName, "", message);}
}

发送广播

RabbitMQUtil.sendFanoutMessage(RabbitFanoutConfig.FANOUT_EXCHANGE_NAME, JSONUtil.toJsonStr(noticeLogList));

----------------------------------像孩子一样,真诚。像夕阳一样,温暖。像天空一样,宁静。----------------------------------


http://www.ppmy.cn/server/101654.html

相关文章

C ++ 也可以搭建Web?高性能的 C++ Web 开发框架 CPPCMS + MySQL 实现快速入门案例

什么是CPPCMS? CppCMS 是一个高性能的 C Web 开发框架,专为构建快速、动态的网页应用而设计,特别适合高并发和低延迟的场景。其设计理念类似于 Python 的 Django 或 Ruby on Rails,但针对 C 提供了更细粒度的控制和更高效的性能。…

第11章 第3节 软件测试的基本概念(软件评测师)

1.软件测试的对象不包括(质量保证方法) 【解析】软件测试的对象是程序、数据和文档相关的 2.测试记录包括() 测试计划或包含测试用例的测试规格说明 与测试用例相关的所有结果,包括在测试期间出现的所有失败 测试…

BLIP 中q-former使用方法

BLIP (Bidirectional Encoder Representations from Images and Text Pre-training) 是一种用于多模态任务的预训练模型,它可以处理图像和文本的联合表示。BLIP 中的 Q-Former 是一个用于处理图像特征并与文本特征进行交互的 Transformer 模块。Q-Former 被设计用于…

【安卓】播放多媒体文件

文章目录 播放音频播放视频 播放音频 在Android中播放音频文件一般是使用MediaPlayer类实现的,它对多种格式的音频文件提供了非常全面的控制方法,从而使播放音乐的工作变得十分简单。 MediaPlayer类中常用的控制方法。 常用方法名描述setDataSource()设…

《ImageNet: A Large-Scale Hierarchical Image Database》李飞飞论文阅读笔记

OpenSNN开思通智网,官网地址:https://w3.opensnn.com/ 2024年8月份 “O站创作者招募计划” 快来O站写文章,千元大奖等你来拿! “一起来O站,玩转AGI!” 论文地址: 《ImageNet: A Large-Scale Hierarchical I…

Ruby模板引擎:构建动态视图的艺术

标题:Ruby模板引擎:构建动态视图的艺术 在Ruby on Rails的世界里,模板引擎是构建动态网页的基石。它们允许开发者将服务器端的逻辑嵌入到HTML中,实现数据的动态展示。本文将深入探讨Ruby中几种常用的模板引擎,包括ERB…

智能安全守护,寺庙安全用电解决方案

在四川省蓬溪县城北,高峰山以其千年的历史沉淀和独特的文化风貌,默默诉说着道教与佛教交融的传奇。然而,2017年5月31日凌晨的一声巨响,打破了这里的宁静,一场突如其来的大火,让这座承载着无数信徒信仰与梦想…

09-JavaScript(代码)

01-二级导航 <style>* {margin: 0;padding: 0;list-style: none;}ul {width: 500px;height: 30px;margin: 30px auto;position: relative;}li {width: 100px;height: 30px;border: 1px solid #000;text-align: center;line-height: 30px;box-sizing: border-box;float: …