vue3+ts+java使用WebSocket传输数据

news/2025/2/22 5:36:06/

一、环境

系统:win11

IDE:vscode

框架:electron22.0.0+vite2+vue3+typescript4.8.4+springboot2.2.5+jdk1.8

二、websocket介绍

2.1 由来

        WebSocket未出现之前,浏览器和服务器之间的通信是通过Web的poll技术进行通信,就是浏览器不停的对服务器主动发动请求,每发起一次新的HTTP请求,就是开启一次新的TCP链接,HTTP协议本身就是一种开销非常大的协议,所以这种方式效率低下。于是就出现了WebSocket协议。

        下面是采用poll方式的代码示例:

setInterval(() => {// 查询注册机列表getRegisterInfo().then(res => {isHost.value = store.state.onHost;   }).catch(err => {console.log('getComputerList err:', err);});
}, 1000);

为了页面及时更新,会像服务器产生大量的请求,造成资源浪费。

2.2 WebSocket通信过程

        WebSocket是一种完全不同于HTTP的协议,但是它需要通过HTTP协议的GET请求,将HTTP协议升级为WebSocket协议。升级的过程被称为握手(handshake)。当浏览器和服务器握手成功后,则可以开始根据WebSocket定义的通信帧格式开始通信了。WebSocket协议的通信帧也分为控制数据帧和普通数据帧,前者用于控制WebSocket链接状态,后者用于承载数据。

        握手过程就是将HTTP协议升级为WebSocket协议的过程。在HTTP的GET请求头部添加信息如下:

Upgrade: websocket      #规定必需的字段,其值必需为 websocket , 如果不是则握手失败;
Connection: Upgrade  #规定必需的字段,值必需为 Upgrade , 如果不是则握手失败;
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==    #必需字段,一个随机的字符串;
Sec-WebSocket-Protocol: chat, superchat   #可选字段,可以用于标识应用层的协议;
Sec-WebSocket-Version: 13  #必需字段,代表了 WebSocket 协议版本,值必需是 13 , 否则
握手失败;

当服务器端,成功验证了以上信息后,则会返回一个形如以下信息的响应:

HTTP/1.1 101 Switching Protocols   #101代表握手成功的状态码
Upgrade: websocket  #规定必需的字段,其值必需为 websocket , 如果不是则握手失败;
Connection: Upgrade #规定必需的字段,值必需为 Upgrade , 如果不是则握手失败;
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=  #规定必需的字段,该字段的值是通过固定字符串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 加上请求中 Sec-WebSocket-Key 字段的值,然后再对其结果通过 SHA1 哈希算法求出的结果。
Sec-WebSocket-Protocol: chat  #对应于请求中的 Sec-WebSocket-Protocol 字段;

2.3 WebSocket 优缺点

        优点:

        1、使用资源少。创建连接后,数据叫唤的包头较少;

        2、能实现及时通信。长连接,实时通信;

        3、更好的二进制支持。能更好的处理二进制内容;

        4、支持拓展。用户可以拓展协议,实现部分自定义的子协议。

        缺点:

        1、使用WebSocket,长连接,会占用一定资源;

        2、浏览器品类多,支持程度不同,可能问题多;

        3、与poll相比,代码复杂度将上升,完全依赖于websocket,要多写逻辑对websocket状态进行监控,对开发者要求也会高一些。

       没有完美的事物,我们讨论优缺点的目的是它适合什么场景,在要求实时性较高的应用时,那么WebSocket会更适合。如果基本都是操作的应用,实时性要求很低,那么WebSocket使用的资源成本就是不合适的。

2.4 浏览器支持

      WebSocket - Web API 接口参考 | MDNWebSocket 对象提供了用于创建和管理 WebSocket 连接,以及可以通过该连接发送和接收数据的 API。icon-default.png?t=N7T8https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket以上是供参考的API接口,本文不做赘述,自己进入使用。

三、前端使用示例

文件名称

具体代码

/** @Descripttion: 封装socket方法* @version:* @Date: 2021-08-06 11:14:39* @LastEditTime: 2021-10-26 14:06:34*/
import { store } from "../store";
import { ElMessage } from "element-plus";
import { Base64 } from "js-base64";
import { updateComputerIsValid } from "/@/service/AppService";
import { localIp, playDutySound } from "../CommonUtil";
import { deal3004Procotol, deal3005Procotol, deal3006Procotol } from "/@/service/WebsocketService"; //业务代码interface socket {websocket: any;connectURL: string;socket_open: boolean;hearbeat_timer: any;hearbeat_interval: number;is_reonnect: boolean;reconnect_count: number;reconnect_current: number;ronnect_number: number;reconnect_timer: any;reconnect_interval: number;// init: (receiveMessage: Function | null) => any;init: () => any;receive: (message: any) => void;heartbeat: () => void;send: (data: any, callback?: any) => void;close: () => void;reconnect: () => void;webSocketBack?: (message: any) => void;
}const socket: socket = {websocket: null,connectURL: import.meta.env.VITE_WEBSOCKET_MONITOR_URL + localIp().replaceAll(".", ""),// 开启标识socket_open: false,// 心跳timerhearbeat_timer: null,// 心跳发送频率hearbeat_interval: 3000,// 是否自动重连is_reonnect: true,// 重连次数reconnect_count: 5000,// 已发起重连次数reconnect_current: 1,// 网络错误提示此时ronnect_number: 0,// 重连timerreconnect_timer: null,// 重连频率 不能设置的太小,否则会出现一次重连未返回的时候,下一次又开始重连reconnect_interval: 6000,// init: (receiveMessage: Function | null) => {init: () => {if (!("WebSocket" in window)) {// if (!("WebSocket" in window)) {ElMessage.warning("浏览器不支持WebSocket");return null;}// 已经创建过连接不再重复创建if (socket.websocket) {return socket.websocket;}socket.websocket = new WebSocket(socket.connectURL);socket.websocket.onmessage = (e: any) => {// if (receiveMessage) {//     receiveMessage(e);// }if (socket.webSocketBack) {socket.webSocketBack(e);}};socket.websocket.onclose = (e: any) => {console.log("websocket--关闭", socket.reconnect_current,e);if (socket.hearbeat_timer) {clearInterval(socket.hearbeat_timer);}//业务代码- 置位为1updateComputerIsValid(localIp(), 1);socket.socket_open = false;// 需要重新连接if (socket.is_reonnect) {console.log("websocket--需要重新连接", socket.is_reonnect,socket.reconnect_interval);socket.reconnect_timer = setTimeout(() => {console.log("websocket--重连", socket.reconnect_current);// 超过重连次数if (socket.reconnect_current > socket.reconnect_count &&socket.reconnect_count > -1) {console.log("websocket--超过重连次数");clearTimeout(socket.reconnect_timer);socket.is_reonnect = false;return;}// 记录重连次数socket.reconnect_current++;//清除 socket.websocketsocket.websocket = null;socket.reconnect();}, socket.reconnect_interval);}};// 连接成功socket.websocket.onopen = function () {console.log("websocket--连接成功");//业务代码updateComputerIsValid(localIp(), 0);socket.socket_open = true;socket.is_reonnect = true;// 开启心跳socket.heartbeat();};// 连接发生错误socket.websocket.onerror = function () {console.log("websocket--发生错误!关闭执行重连");socket.websocket.onclose();};},send: (data, callback = null) => {// 开启状态直接发送if (socket.websocket.readyState === socket.websocket.OPEN) {socket.websocket.send(JSON.stringify(data));if (callback) {callback();}// 正在开启状态,则等待1s后重新调用} else {clearInterval(socket.hearbeat_timer);if (socket.ronnect_number < 1) {// ElMessage({// 	type: 'error',// 	message: i18n.global.t('chat.unopen'),// 	duration: 0,// })console.log("服务关闭了!");}socket.ronnect_number++;}},receive: (message: any) => {let params = Base64.decode(JSON.parse(message.data).data);params = JSON.parse(params);return params;},heartbeat: () => {if (socket.hearbeat_timer) {clearInterval(socket.hearbeat_timer);}socket.hearbeat_timer = setInterval(() => {let diffMs = Number(new Date()) - Number(store.state.webSocketLastTime);console.log("websocket--上次间隔时间:", diffMs, "3秒以上才发送心跳包");if (diffMs > 0) {let data = {// languageId: store.state.users.language,content: "ping",};var sendDara = {encryption_type: "base64",data: Base64.encode(JSON.stringify(data)),};socket.send(sendDara);store.commit("setWebSocketLastTime", new Date());console.log("websocket--心跳发送",sendDara,"更新时间:",store.state.webSocketLastTime);}}, socket.hearbeat_interval);},close: () => {clearInterval(socket.hearbeat_timer);socket.is_reonnect = false;socket.websocket.onclose();},/*** 重新连接*/reconnect: () => {//websocket存在且不想重连的时候if (!socket.is_reonnect) {// if (socket.websocket && !socket.is_reonnect) {console.log("websocket--存在但是不需要重连的时候,关闭", socket.websocket, socket.is_reonnect);socket.close();}socket.init();},/*** 业务代码--数据处理 * @param backMessage*/webSocketBack(backMessage: any) {store.commit("setWebSocketLastTime", new Date());console.log("websocket-接受到的信息" + JSON.stringify(backMessage),"更新的时间:",store.state.webSocketLastTime);const wsData = backMessage.data.split("|");const wsDataCode = backMessage.data.split("|")[0];// 零位是协议号switch (wsDataCode) {// 值班机获取 提醒间隔时间 后的处理case "3002": {console.log("收到ws:3002: " + JSON.stringify(wsData));const setHost = wsData[1];store.commit("setDutyConfirmTime", Number(wsData[2]));if (setHost === "0") {store.commit("setLocalComputerDutyState", 0);store.commit("setOnDutyState", 0);} else {store.commit("setLocalComputerDutyState", 1);store.commit("setOnDutyState", 1);}break;}case "3003": {console.log("收到ws:3003", wsDataCode);if (wsData[1] === "0") {playDutySound();if (store.state.onDutyState === 0) {} else if (store.state.onDutyState === 1) {、playDutySound();store.commit("setOnDutyState", true);}} else if (wsData[1] === "1") {store.commit("setOnDutyState", false);}break;}case "3004": {//更新store中的数据deal3004Procotol(wsData);break;}case "3005": {//更新store中的数据deal3005Procotol(wsData);break;}case "3006": {//更新store中的数据deal3006Procotol(wsData);break;}}},
};export default socket;

其中业务代码请不用关注,自己实现自己的业务逻辑即可。

1001错误

对于重连时间的设置,如果设置的时间太短,会出现反复1001错误(The WebSocket session [] timeout expired)关闭再重连的现象:

把服务端关闭后

每次错误返回中间有两次重连操作,所以调整了重连间隔时间,错误消失,推论:一次重连结果还未出来的时候,又发起了地址一样的连接请求,造成冲突,会关闭上次连接,这次关闭会引发上次连接的重连,这就造成了反复重连。目前我采用的是拉长重连时间,比较简单,可以尝试通过判断连接状态来阻止一次连接没完成之前再次连接。

流程图

启动连接

//APP.VUE
import socket from "/@/utils/websocket";onMounted(async () => {socket.init();
});

四、后端服务

引入依赖包

<dependency><!-- websocket --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

客户端ip获取:

在mainApplication上添加下面注解:

@ServletComponentScan("**.**.filter")  //防止 @WebListener 无效
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;@javax.servlet.annotation.WebFilter(filterName = "sessionFilter",urlPatterns = "/*")
@Order(1)
public class WebFilter implements Filter {@Overridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {HttpServletRequest req= (HttpServletRequest) servletRequest;req.getSession().setAttribute("ip",req.getRemoteHost());filterChain.doFilter(servletRequest,servletResponse);}
}

WebSocket配置类

在这里也做了IP的获取

@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {/*** 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {Map<String, Object> attributes = sec.getUserProperties();HttpSession session = (HttpSession) request.getHttpSession();if (session != null) {attributes.put(GlobalContants.IP_ADDR, session.getAttribute("ip"));Enumeration<String> names = session.getAttributeNames();while (names.hasMoreElements()) {String name = names.nextElement();attributes.put(name, session.getAttribute(name));}}}
}

Websocket接收

import com.baomidou.mybatisplus.core.toolkit.ArrayUtils;
import com.deyou.cabin.monitor.common.GlobalContants;
import com.deyou.cabin.monitor.common.GlobalParams;
import com.deyou.cabin.monitor.common.utils.AssembleDownProtocolUtils;
import com.deyou.cabin.monitor.common.utils.CommonServeUtils;
import com.deyou.cabin.monitor.config.WebSocketConfig;
import com.deyou.cabin.monitor.model.WebSocketModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;@Slf4j
@Component
//@RequiredArgsConstructor
@ServerEndpoint(value = "/websocket/monitor/{code}",configurator = WebSocketConfig.class)
public class WebsocketController {/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session,@PathParam(value = "code") String code) {try{session.setMaxIdleTimeout(30000);}catch (Exception e){log.error(e.getMessage(),e);}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session) {try{log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), }catch (Exception e){log.error(e.getMessage(),e);}}/*** 收到客户端消息后调用的方法** @param message*            */@OnMessagepublic void onMessage(String message, Session session) {try{log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);}catch (Exception ex){log.error(ex.getMessage(),ex);}}@OnErrorpublic void onError(Session session, Throwable error) {log.error("websocket发生错误:" + session.getId() + "---" + error.getMessage(),error);}public void sendMessageToAll(String message, Session fromSession) {try{//GlobalParams.webSocketModelMap是全局变量 ConcurrentHashMap<String, WebSocketModel> webSocketModelMapfor (Map.Entry<String, WebSocketModel> sessionEntry : GlobalParams.webSocketModelMap.entrySet()) {Session toSession = sessionEntry.getValue().getSession();// 排除掉自己if (!fromSession.getId().equals(toSession.getId())) {log.info("服务端给客户端[{}][{}]发送消息{}", toSession.getId(),sessionEntry.getValue().getWebSocketCode(), message);sendMessToOne(message,toSession);}}}catch (Exception e){log.error(e.getMessage(),e);}}public void sendMessageToAll(String message) {try {//GlobalParams.webSocketModelMap是全局变量 ConcurrentHashMap<String, WebSocketModel> webSocketModelMapfor (Map.Entry<String, WebSocketModel> sessionEntry : GlobalParams.webSocketModelMap.entrySet()) {Session toSession = sessionEntry.getValue().getSession();log.info("服务端给客户端[{}][{}]发送消息{}", toSession.getId(), sessionEntry.getValue().getWebSocketCode(), message);sendMessToOne(message, toSession);}} catch (Exception e) {log.error(e.getMessage(), e);}}public void sendMessToOne(String message, Session toSession) {try {// 尝试过锁住方法,还是不行,这里锁住webSocketMap,让多线程,访问不同对象,也能同步synchronized(GlobalParams.webSocketModelMap){String toId = toSession.getId();if (StringUtils.isNotBlank(toId) && GlobalParams.webSocketModelMap.containsKey(toId)) {GlobalParams.webSocketModelMap.get(toId).getSession().getBasicRemote().sendText(message);}}} catch (Exception e) {log.error("服务端发送消息给客户端失败,"+e.getMessage(),e);}}}

其中,synchronized(GlobalParams.webSocketModelMap)中GlobalParams.webSocketModelMap是我记录当前在线的websocket的信息。上边代码的注释中已经写了,这个锁的目的是为了解决websocket服务端下发时出现的错误“The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method”的错误,问题的引发场景和分析个人记录如下: 

1.因为在 @OnMessage中,我有两个方法同时使用了session,导致session多线程不安全,发生的频次少都可能不出现这个问题!

2.JSON.toJSONString(GlobalParams.webSocketModelMap) 其中带有session,会引发这个问题 解决办法:加异步锁,但是需要锁定 ConcurrentHashMap<String, WebSocketModel>。

使用

@Resource
private WebsocketController websocketService;
try{websocketService.sendMessToOne(sendMes, toSession);
}catch (Exception e){log.error(e.getMessage(),e);
}

5、结束

连接地址:ws://IP:PORT/websocket/monitor/{code} ,其中code是你自己定义的值。



http://www.ppmy.cn/news/1122363.html

相关文章

相机有俯仰角时如何将像素坐标正确转换到其他坐标系

一般像素坐标系转相机坐标系都是默认相机是水平的&#xff0c;没有考虑相机有俯仰角的情况&#xff0c;大致的过程是&#xff1a;像素坐标系统-->图像坐标系-->相机坐标系 ->世界坐标系或雷达坐标系: 像素坐标系 像素坐标系&#xff08;u&#xff0c;v&#xff09;是…

RestTemplate发送HTTPS请求

RestTemplate发送HTTPS请求 基础知识&#xff1a; Https原理与工作流程及证书校验&#xff1a;https://www.cnblogs.com/zjdxr-up/p/14359904.html 忽略ssl证书的方式配置&#xff1a; import lombok.extern.slf4j.Slf4j;import org.springframework.http.client.SimpleClien…

Flutter性能监控与优化实践

Flutter是谷歌的移动UI框架,可以快速在iOS和Android上构建高质量的原生用户界面。 Flutter可以与现有的代码一起工作。在全世界,Flutter正在被越来越多的开发者和组织使用,并且Flutter是完全免费、开源的,可以用一套代码同时构建Android和iOS应用,性能可以达到原生应用一样…

【AI视野·今日Robot 机器人论文速览 第三十六期】Tue, 19 Sep 2023

AI视野今日CS.Robotics 机器人学论文速览 Tue, 19 Sep 2023 (showing first 100 of 112 entries) Totally 112 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;In-Hand Object Rotation, RotateIt 提出了一种基于视觉与触觉的物体旋转朝向的方法…

PRT(Precomputed Radiance Transfer【2002】)原理实现

声明 本文源自对Games202课程&#xff0c;作业2的总结。 参考 手把手教你写GAMES202作业&#xff1a;GAMES202-作业2&#xff1a; Precomputed Radiance Transfer&#xff08;球谐函数&#xff09;GAMES 202 作业2Games202课程个人Blog 课程总结&#xff1a;Games202(P6、P7…

Nginx之error_page模块解读

目录 error_page的概念 使用举例 跳转到指定页面 跳转到指定网址 使用location的符合完成错误信息展示 ​更改反馈状态码 error_page配置小提示 实战应用解读 限流应用 寻找错误码对应的文件 error_page的概念 error_page是nginx一个重要的指令&#xff0c;作用是…

【MySQL】 MySQL索引事务

文章目录 &#x1f6eb;索引&#x1f38d;索引的概念&#x1f333;索引的作用&#x1f384;索引的使用场景&#x1f340;索引的使用&#x1f4cc;查看索引&#x1f4cc;创建索引&#x1f332;删除索引 &#x1f334;索引保存的数据结构&#x1f388;B树&#x1f388;B树&#x…

【draw】draw.io怎么设置默认字体大小

默认情况下draw里面的字体大小是12pt&#xff0c;我们可以将字体改成我们想要的大小&#xff0c;例如18pt。 然后点击样式&#xff0c;设置为默认样式。 下一次当我们使用文字大小时就是18pt了。