消息推送只会用websocket、轮询?试试SSE,轻松高效。

news/2024/9/22 13:05:31/

SSE_0">SSE介绍

HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点:

单向通信:
SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。

持久连接:
SSE 使用 HTTP 持久连接(long-lived connection)来保持客户端与服务器之间的连接,避免频繁地重新建立连接。

事件驱动:
SSE 采用事件驱动的方式,服务器将数据封装成事件推送给客户端,客户端可以根据事件类型进行相应的处理。

简单易用:
SSE 的协议简单,基于标准的 HTTP 协议,可以在任何支持 HTTP 的环境中使用。
客户端和服务器端的实现也相对简单,开发成本较低。

可靠性:
SSE 基于 HTTP 协议,可以利用 HTTP 的重试机制来提高数据传输的可靠性。
如果连接断开,客户端可以自动重新连接并恢复数据传输。

浏览器支持:
主流浏览器(Chrome、Firefox、Safari 等)都原生支持 SSE
对于不支持 SSE 的浏览器,可以使用 polyfill 库来实现兼容性。

应用场景:
SSE 适用于实时性要求较高的场景,如聊天应用、体育赛事直播、股票行情更新等。
与 WebSocket 相比,SSE 更加轻量级,适用于一些对实时性要求不太高但需要持续更新的场景。

总的来说,HTTP SSE 提供了一种简单、可靠、高效的服务器推送机制,可以在各种 Web 应用中得到广泛应用。它是 Web 实时通信技术的一种重要补充。

websocket_33">与websocket对比

HTTP Server-Sent Events (SSE) 和 WebSocket 都是实现服务器与客户端之间实时双向通信的技术,但它们在某些方面存在一些差异。以下是它们的对比:

  1. 通信模式:
    • SSE 是单向通信,服务器只能主动推送数据给客户端,客户端只能被动接收。
    • WebSocket 是双向通信,服务器和客户端可以互相发送和接收数据。
  2. 连接方式:
    • SSE 使用标准的 HTTP 连接,利用 HTTP 持久连接来保持连接。
    • WebSocket 使用独立的 WebSocket 协议,建立全双工的 TCP 连接。
  3. 传输协议:
    • SSE 使用标准的 HTTP 协议,数据以文本的形式传输。
    • WebSocket 使用自己的二进制协议,可以传输二进制数据。
  4. 浏览器支持:
    • SSE 被大多数现代浏览器原生支持。
    • WebSocket 也被大多数现代浏览器原生支持。
  5. 可靠性:
    • SSE 可以利用 HTTP 的重试机制来提高数据传输的可靠性。
    • WebSocket 建立在 TCP 协议之上,也具有较高的可靠性。
  6. 实时性:
    • SSE 的实时性略低于 WebSocket,因为它需要依赖 HTTP 的连接机制。
    • WebSocket 建立在独立的 TCP 连接之上,实时性更高。
  7. 应用场景:
    • SSE 更适合于一些实时性要求不太高但需要持续更新的场景,如聊天应用、体育赛事直播等。
    • WebSocket 更适合于需要实时双向通信的场景,如在线游戏、视频会议等。

总的来说,SSE 和 WebSocket 都是实现服务器与客户端实时通信的有效方式,它们各有优缺点,适用于不同的应用场景。在选择时需要根据具体的需求来权衡取舍。

上代码

主体工具类 SseUtil

import com.alibaba.fastjson.JSON;
import com.enums.EnumDeviceType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** SSE 通信工具类** @author Supreme_Sir* @version V1.0.0*/
@Component
@Slf4j
public class SseUtil {/*** SSE 超时时间 24小时*/private static final Long TIMEOUT_24_HOUR = 86400000L;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate UnreadMessageCountCacheUtil unreadMessageCountCacheUtil;/*** 订阅SSE*/public SseEmitter subscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter == null) {//生成连接并存储sseEmitter = new SseEmitter(TIMEOUT_24_HOUR);SingletonConcurrentHashMap.INSTANCE.put(deviceType, userId, sseEmitter);}//设置回调函数sseEmitter.onCompletion(completionCallBack(deviceType, userId));sseEmitter.onTimeout(timeoutCallBack(deviceType, userId));sseEmitter.onError(errorCallBack(deviceType, userId));// 立即发送未读消息数量,消除前端等待Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));log.info("用户-{}-{} SSE连接成功", userId, deviceType.getName());return sseEmitter;}/*** 退订消息** @param userId 用户ID*/public String unsubscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {log.info("用户-{}-{} 主动断开连接", userId, deviceType.getName());//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。sseEmitter.complete();SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);}return "退订成功";}/*** 发送SSE消息** @param userId  用户ID* @param content 消息内容*/public void sendMessage(Long userId, SseMessageVo content) {for (EnumDeviceType deviceType : EnumDeviceType.values()) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {try {log.info("向用户-{} SSE发送消息-{}", userId, JSON.toJSONString(content));sseEmitter.send(content);} catch (IOException e) {log.error("用户-{}-{} SSE发送消息异常-{}", userId, deviceType.getName(), e.getMessage());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);log.error("用户-{}-{} SSE发送消息异常被移除", userId, deviceType.getName());}}}}/*** SSE 单向通信心跳检测(需配合定时任务)*/public void heartbeat() {SingletonConcurrentHashMap.INSTANCE.getMap().forEach((key, value) -> {Long userId = extractNumbers(key.toString());Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));});}/*** SSE 连接成功回调** @param userId 用户ID*/private Runnable completionCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> log.info("用户-{}-{} SSE连接断开", userId, deviceType.getName()));}/*** 出现超时,将当前用户缓存删除** @param userId 用户ID*/private Runnable timeoutCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.error("用户-{}-{} SSE连接超时", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接超时被移除", userId, deviceType.getName());});}/*** 出现异常,将当前用户缓存删除** @param userId 用户ID*/private Consumer<Throwable> errorCallBack(EnumDeviceType deviceType, Long userId) {return throwable -> {log.error("用户-{}-{} SSE连接异常", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接异常被移除", userId, deviceType.getName());};}/*** 截取字符串中的数字** @param input 待截取的字符串*/private Long extractNumbers(String input) {Pattern pattern = Pattern.compile("[a-zA-Z](\\d+)");Matcher matcher = pattern.matcher(input);if (matcher.find()) {// 返回第一个匹配的数字序列return Long.valueOf(matcher.group(1));} else {// 如果没有找到匹配项,可以返回null或抛出异常return null;}}
}

要点:

  1. 新建好的 SSE 对象需要用容器存储起来,以服务于后续消息通信
  2. 回调使用 ThreadPool 进行管理避免线程过多。
  3. 一个 SSE 对象只能与一端保持通信,如果存在多端的话,需要创建多个对象。

SSE对象单例存储容器 SingletonConcurrentHashMap

import com.enums.EnumDeviceType;
import lombok.Getter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;/*** 基于ConcurrentHashMap的单例版SSE存储容器*/
@Getter
public enum SingletonConcurrentHashMap {/*** 单例版存储容器*/INSTANCE;private final ConcurrentHashMap<Object, SseEmitter> map = new ConcurrentHashMap<>();/*** 存入对象*/public void put(EnumDeviceType deviceType, Object key, SseEmitter value) {map.put(deviceType.getCode() + key, value);}/*** 获取对象*/public SseEmitter get(EnumDeviceType deviceType, Object key) {return map.get(deviceType.getCode() + key);}/*** 判断缓存中是否存在当前用户的SSE实例** @param key 用户ID*/public boolean haveInstance(Object key) {// 分别查询PC、小程序的SSE实例for (EnumDeviceType deviceType : EnumDeviceType.values()) {if (map.get(deviceType.getCode() + key) != null) {return true;}}return false;}/*** 移除对象*/public void remove(EnumDeviceType deviceType, Object key) {map.remove(deviceType.getCode() + key);}/*** 判断是否存在*/public boolean containsKey(EnumDeviceType deviceType, Object key) {return map.containsKey(deviceType.getCode() + key);}/*** 获取对象数量*/public int size() {return map.size();}/*** 清空*/public void clear() {map.clear();}}

心跳数据缓存工具 UnreadMessageCountCacheUtil

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.redis.RedisTemplateUtils;
import com.enums.EnumYesOrNo;
import com.util.RedisKeyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** @author Supreme_Sir* @description 未读消息条数缓存工具**/
@Component
@Slf4j
public class UnreadMessageCountCacheUtil {@Resourceprivate IDao dao;// 过期时间30分钟private static final Long TIMEOUT = 30L;/*** 添加缓存*/private void put(Long key, Object value) {if (Objects.isNull(key) || Objects.isNull(value)) {return;}RedisTemplateUtils.setCacheObject(RedisKeyUtils.getUnreadMessageCount() + key, value, TIMEOUT, TimeUnit.MINUTES);}/*** 获取缓存(缓存中如果没有则回数据库查询)*/public Long getWithCallBack(Long key) {if (Objects.isNull(key)) {return null;}Object cnt = RedisTemplateUtils.getCacheObject(RedisKeyUtils.getUnreadMessageCount() + key);if (Objects.isNull(cnt)) {cnt = queryCount(key);put(key, cnt);}return Long.valueOf(cnt.toString());}/*** 获取最新缓存** @return {@link Long} 最新未读数据条数*/public Long getWithRefresh(Long key) {if (Objects.isNull(key)) {return null;}Long cnt = queryCount(key);put(key, cnt);return cnt;}/*** 手动刷新缓存*/public void refresh(Long key) {if (Objects.isNull(key)) {return;}put(key, queryCount(key));}/*** 回库查询未读消息条数** @param userId 用户ID* @return {@link Long} 未读消息数量*/private Long queryCount(Long userId) {QueryWrapper<> wrapper = new QueryWrapper<>();// 连接数据库查询数据return dao.selectCount(wrapper);}
}

注意:该缓存工具对象由 Spring 容器管理,以确保单例。

前端关键代码

import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
fetchEventSource(`${env.VITE_API_URL_PREFIX}/xxx/sse/xxx`, {signal: ctrl.signal,method: 'POST',headers: {'Auth-Token': localStorage.getItem(TOKEN_NAME),},body: JSON.stringify({UserID: localStorage.getItem('userID'),}),openWhenHidden: true,onopen: async (event: any) => {console.log('sse open:', event);},onmessage: async (event: any) => {const data = JSON.parse(event.data);this.setMsgCount(data.UnreadMsgCount || 0);console.log('SSE 消息:', data);if (data.Data) {const NotifyInstance = await NotifyPlugin.info({class: 'global-notify-card-wrap',icon: false,duration: 10000,closeBtn: false,offset: [0, 53],content: (h) =>h(MessageBox, {Data: data.Data,onHide: () => {NotifyInstance.close();},}),} as any);}},
});
this.see = {close: () => ctrl.abort(),
};

-------------------------------------------风雨里做个大人,阳光下做个孩子。-------------------------------------------


http://www.ppmy.cn/news/1501886.html

相关文章

词的向量化和文本向量化

词的向量化和文本向量化 向量化one-hot编码提前准备词表不提前准备词表one-hot缺点 词向量简介词向量的定义和目标word embedding和word vector的区别onehot编码与词向量关系构建 训练方式1&#xff08;基于语言模型&#xff09;训练方式2&#xff08;基于窗口&#xff09;CBOW…

XCode 编译 PAG 源码

最近工作中要使用PAG替换Lottie&#xff0c;为了方便阅读源码&#xff0c;使用XCode对其源码进行了编译。 1 下载源码 编译源码首先要下载源码&#xff0c;有关PAG源码可直接到github上下载。 2 添加相关依赖 下载源码之后&#xff0c;进入到PAG项目根目录&#xff0c;执行如下…

C#网络连接:TCP/IP模式下的网络连接与同步

1&#xff0c;目的 为了测试局域网的消息同步&#xff0c;简单写了下TCP/IP模式的同步&#xff0c;参考这个帖子。 2&#xff0c;核心库部分 using System; using System.Net; using System.Net.Sockets; using System.Text;namespace Coldairarrow.Util.Sockets {/// <s…

Spark RPC框架详解

文章目录 前言Spark RPC模型概述RpcEndpointRpcEndpointRefRpcEnv 基于Netty的RPC实现NettyRpcEndpointRefNettyRpcEnv消息的发送消息的接收RpcEndpointRef的构造方式直接通过RpcEndpoint构造RpcEndpointRef通过消息发送RpcEndpointRef Endpoint的注册Dispatcher消息的投递消息…

C语言经典习题24

文件操作习题 一 编程删除从C盘home文件夹下data.txt文本文件中所读取字符串中指定的字符&#xff0c;该指定字符由键盘输入&#xff0c;并将修改后的字符串以追加方式写入到文本文件C:\home\data.txt中。 #include<stdio.h> main() { char s[100],ch; int i;…

PHP苹果 V X iPhone微商i o s多分开V X语音转发密友朋友圈一键跟圈软件

苹果VX神器&#xff01;iPhone微商必备&#xff1a;ios多开、VX语音转发、密友朋友圈一键跟圈软件大揭秘&#xff01; 一、iOS多开新境界&#xff0c;工作生活两不误&#xff01; 你是不是也烦恼过&#xff0c;想要在工作号和生活号之间自由切换&#xff0c;却因为iPhone的限制…

Git 分布式版本控制系统、创建分支,跳转分支、git拉取、在码云上创建项目,进行pull 和 push、克隆码云上任意项目

目录 1.Git 分布式版本控制系统&#xff1a; 1.安装git 2.创建目录&#xff0c;进行初始化 3.写入Java文件&#xff0c;提交文件 4.文件放入仓库 2.创建分支&#xff0c;跳转分支&#xff08;所有的git操作都应该工作在&#xff0c;指定的init 目录下进行&#xff09; 1.…

javascript(二)

三、选择器 1.选择器的种类 document.getElementById() 通过id来查找元素 document.getElementsByTagName() 通过标签来查找元素 document.getElementsByClassName() 通过类名&#xff08;class&#xff09;来查找元素 document.getElementsByName() 通过name来查找元…