Springboot 集成websocket 并支持服务集群

embedded/2024/10/9 8:13:41/

1、新增配置类声明


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebsocketConfig {/*** 如果单元测试报错,请在类上加上以下注解内容* @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}}

2、新建websocket连接类


import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;/*** html页面与之关联的接口* var reqUrl = "http://localhost:8080/ws/device/{deviceType}/{deviceAddress}";* socket = new WebSocket(reqUrl.replace("http", "ws"));*/
@Slf4j
@Component
@ServerEndpoint("ws/device/{deviceType}/{deviceAddress}")
public class OwonWebSocketServerEndpoint {private String KEY;private String DEVICE_TYPE;@OnOpenpublic void onOpen(Session session, @PathParam("deviceType") String deviceType, @PathParam("deviceAddress") String deviceAddress) {log.info("发现设备连接,deviceType:" + deviceType + ",deviceAddress:" + deviceAddress);DeviceDO userDevice = SpringContextUtils.getBean(rDeviceMapper.class).findByDeviceTypeAndDeviceAddress(deviceType, deviceAddress);if (userDevice == null) {try {session.close();} catch (IOException e) {// ignore}return;}this.KEY = deviceType + WsSessionManager.SPLIT + deviceAddress;this.DEVICE_TYPE = deviceType;SpringContextUtils.getBean(WsSessionManager.class).add(KEY, session);log.info(String.format("成功建立连接, key:" + this.KEY));}@OnClosepublic void onClose() {SpringContextUtils.getBean(WsSessionManager.class).remove(this.KEY);log.info("成功关闭连接, key:" + KEY);}@OnMessagepublic void onMessage(Session session, String message) {log.info("收到消息, message:" + message);}@OnErrorpublic void onError(Session session, Throwable error) {log.info("发生错误:" + this.KEY);error.printStackTrace();}/*** 指定发消息** @param message*/public void sendMessage(String deviceType, String deviceAddress, String message) {SpringContextUtils.getBean(MessageSendManager.class).sendMessage(deviceType, deviceAddress, message);}

3、新建session管理类


import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.websocket.Session;
import java.io.IOException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class WsSessionManager {@Value("${eureka.instance.instance-id}")private String instanceId;@Autowiredprivate JedisUtil jedisUtil;public static final String SPLIT = "@#@";/*** 保存连接 session 的地方*/private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();/*** 添加 session** @param key* @param session*/public void add(String key, Session session) {// 添加 sessionSESSION_POOL.put(key, session);// 记录session所在的机器String url = getLocalServerUrl();jedisUtil.set(key, url, 24 * 3600);}/*** 设置链接机器地址** @param key*/public void setServerUrl(String key) {// 记录session所在的机器String url = getLocalServerUrl();jedisUtil.set(key, url, 24 * 3600);}/*** 删除 session,会返回删除的 session** @param key* @return*/public Session remove(String key) {// 删除 sessionSession session = SESSION_POOL.remove(key);// 删除记录的机器地址jedisUtil.del(key);return session;}/*** 删除并同步关闭连接** @param key*/public void removeAndClose(String key) {Session session = remove(key);if (session != null) {try {// 关闭连接session.close();} catch (IOException e) {// todo: 关闭出现异常处理e.printStackTrace();}}}/*** 获得 session** @param key* @return*/public Session get(String key) {// 获得 sessionreturn SESSION_POOL.get(key);}/*** 获取本机的地址** @return*/public String getLocalServerUrl() {return "http://" + instanceId;}/*** 组装session key** @param deviceType 设备类型* @param devId  设备id* @return*/public String getSessionKey(String deviceType, String devId) {return deviceType + SPLIT + devId;}/*** 获取redis 里面存储的链接地址** @param sessionKey* @return*/public String getServerUrl(String sessionKey) {return jedisUtil.get(sessionKey);}/*** 获取所有sessionKey** @return*/public List<String> getAllSessionKeys() {Enumeration<String> keys = SESSION_POOL.keys();if (keys.hasMoreElements()) {return Collections.list(keys);}return Lists.newArrayList();}}

4、新建消息发送类


import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.Session;@Slf4j
@Component
public class MessageSendManager {@Autowiredprivate WsSessionManager wsSessionManager;/*** 发送消息,先看本机,本机没有链接就转发* @param deviceType* @param deviceAddress* @param message* @return*/public Boolean sendMessage(String deviceType, String deviceAddress, String message) {//先尝试找本机Session session = wsSessionManager.get(wsSessionManager.getSessionKey(deviceType, deviceAddress));if (null != session) {synchronized (session) {try {session.getAsyncRemote().sendText(message);log.info("MessageSendManager sendMsg 消息发送成功 deviceType={},deviceAddress={},payload={}", deviceType, deviceAddress, JSON.toJSONString(message));return true;} catch (Exception e) {e.printStackTrace();}return false;}} else {// 转发到链接所在的机器String url = wsSessionManager.getServerUrl(wsSessionManager.getSessionKey(deviceType, deviceAddress));if (StringUtils.isBlank(url)) {log.info("MessageSendManager sendMsg 找不到链接地址 deviceType={},deviceAddress={}", deviceType, deviceAddress);return false;}// 本机地址String localUrl = wsSessionManager.getLocalServerUrl();if (StringUtils.equals(url, localUrl)) {log.info("MessageSendManager sendMsg 本机找不到 deviceType={},deviceAddress={}", deviceType, deviceAddress);return false;}// 转发到其他机器transferByMsg(url, deviceType, deviceAddress, message);return true;}}/*** 发送消息,本机* @param message* @return*/public Boolean sendMessageByKey(String key, String message) {//先尝试找本机Session session = wsSessionManager.get(key);if (null != session) {synchronized (session) {try {session.getAsyncRemote().sendText(message);log.info("MessageSendManager sendMsg 消息发送成功 key={},payload={}", key, JSON.toJSONString(message));return true;} catch (Exception e) {e.printStackTrace();}return false;}}return false;}/*** 发送消息,先看本机,本机没有链接就转发* @param deviceType* @param deviceAddress* @param message* @return*/public Boolean sendMsgToClient(String deviceType, String deviceAddress, String message) {//先尝试找本机Session session = wsSessionManager.get(wsSessionManager.getSessionKey(deviceType, deviceAddress));if (null != session) {synchronized (session) {try {session.getAsyncRemote().sendText(message);log.info("MessageSendManager sendMsg 消息发送成功 deviceType={},deviceAddress={},payload={}", deviceType, deviceAddress, JSON.toJSONString(message));return true;} catch (Exception e) {e.printStackTrace();}return false;}}return false;}private void transferByMsg(String url, String deviceType, String deviceAddress, String message) {String urlString = url + "/device/msg/dispatch";HttpUtil.post(urlString, JSON.toJSONString(new WsMsgDispatchDTO(deviceType, deviceAddress, message)));}}

5、新建转发消息接收类

@RestController
@RequestMapping("/device")
public class DeviceMsgDispatchController {@Autowiredprivate MessageSendManager manager;/*** 消息转发处理* @param dto* @return*/@RequestMapping(value = "/msg/dispatch", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)public BaseResult<Boolean> dispatch(@RequestBody WsMsgDispatchDTO dto) {return BaseResult.success(manager.sendMsgToClient(dto.getDeviceType(), dto.getDeviceAddress(), dto.getMessage()));}}


http://www.ppmy.cn/embedded/96321.html

相关文章

01_理解网络编程和套接字

1.服务端 1.创建套接字 #include <sys/socket.h> int socket(int domain, int type, int protocol); // 成功时返回文件描述符&#xff0c;失败时返回-1&#xff1b; 2.套接字分配地址&#xff08;IP和端口号) #include <sys/socket.h> int bind(int sockfd, s…

基于javaEE的校园二手书交易平台的设计与实现

TOC springboot287基于javaEE的校园二手书交易平台的设计与实现 第1章 绪论 1.1 研究背景 互联网概念的产生到如今的蓬勃发展&#xff0c;用了短短的几十年时间就风靡全球&#xff0c;使得全球各个行业都进行了互联网的改造升级&#xff0c;标志着互联网浪潮的来临。在这个…

希尔排序,详细解析(附图解)

1.希尔排序思路 希尔排序是一种基于插入排序的算法&#xff0c;通过将原始数据分成若干个子序列&#xff0c;然后对子序列进行插入排序&#xff0c;逐渐减小子序列的间隔&#xff0c;最后对整个序列进行一次插入排序。 1.分组直接插入排序&#xff0c;目标接近有序--------…

wiota窄带通讯技术对于vu传统lora

WIoTa是一种针对广域无线物联网通信优化设计的通信协议&#xff0c;而LoRa则是一种广泛应用的低功耗广域网技术。两者在物联网领域都有广泛的应用&#xff0c;但它们在多个关键性能指标上存在显著差异。以下是从多个角度对WIoTa和LoRa进行详细对比&#xff1a; 覆盖范围 WIoTa…

Redis合集 第二章 redis客户端 第一节 jedis

jedis 线程不安全 所以每个线程需要一个独立的链接 为了保证线程安全 所以需要连接池 创建jedis链接池 public class JedisConnectionFactory {public static final JedisPool jedispool;static{//配置连接池JedisPoolConfig jedisPoolConfig new JedisPoolConfig();jedisP…

新能源汽车行业前景广阔,黄山谷捷等产业链企业迎发展良机

目前&#xff0c;我国已成为全球新能源汽车竞争的主战场&#xff0c;产销量连续9年位居世界第一。2024年上半年&#xff0c;我国新能源汽车销量同步增长32%至494.4万辆&#xff0c;市占率为35.2%。中汽协预计&#xff0c;2024年全年中国新能源汽车销量有望达到1150万辆。 随着…

从零搭建xxl-job(五):查询待执行任务逻辑优化

当前的程序还存在很多问题&#xff0c;比如每次扫描数据库都查询了所有的定时任务信息&#xff0c;那么应该查询哪些定时任务信息呢&#xff1f;怎么保证查询的定时任务准时触发&#xff1f;如果数据库中没有定时任务信息了&#xff0c;或者定时任务信息比较少了&#xff0c;sc…

人格障碍诊断系统

TOC springboot186人格障碍诊断系统 第1章 绪论 1.1 研究背景 互联网时代不仅仅是通过各种各样的电脑进行网络连接的时代&#xff0c;也包含了移动终端连接互联网进行复杂处理的一些事情。传统的互联网时代一般泛指就是PC端&#xff0c;也就是电脑互联网时代&#xff0c;但…