引入Maven依赖包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>跟随spingboot版本</version>
</dependency>
后端代码
/*** 开启WebSocket支持*/
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}@Component
@Slf4j
@ServerEndpoint("/demand/task/webSocket/{taskId}") // 前端请求URL
public class TaskWebSocketServer {/*** 保存每个需求任务对应的服务对象*/private static CopyOnWriteArraySet<TaskWebSocketServer> TASK_CACHE = new CopyOnWriteArraySet<>();private Session session;private Long taskId;private static DemandTestTaskService demandTestTaskService;/*** 注入依赖业务处理服务*/@Autowiredpublic void setSunPurchasePayService(DemandTestTaskService demandTestTaskService) {this.demandTestTaskService = demandTestTaskService;}public List<TaskWebSocketServer> getTaskSocketServerList(){List<TaskWebSocketServer> serverList = new ArrayList<>(TASK_CACHE.size());TASK_CACHE.forEach(server -> serverList.add(server));return serverList;}public Long getTaskId(){return taskId;}public boolean userExist(String userId, String deviceId){if (CollectionUtils.isEmpty(pulsarList)){log.info("任务列表为空,请先创建任务 userId:{} deviceId:{}", userId, deviceId);return false;}for (DemandTaskPulsarBO pulsarBO: pulsarList){if (Long.valueOf(userId).equals(pulsarBO.getUserId())){return true;}if (deviceId.equals(pulsarBO.getDeviceId())){return true;}}return false;}/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "taskId") Long taskId) {if (TASK_CACHE.size() > 10){throw new BusinessException(CodeEnum.FAIL.getCode(), "测试任务已达到最大上限10个,请稍后重试");}if (this.taskId != null && this.taskId.equals(taskId)){log.info("web socket reconnection taskId:{}", taskId);}this.taskId = taskId;// TODO 补偿你的业务逻辑// 设置会话超时时间 30 * 60 * 1000session.setMaxIdleTimeout(1800000L);this.session = session;TASK_CACHE.add(this);try {session.getBasicRemote().sendText("connect success. taskId=" + taskId);log.info("web socket connect success taskId:{} pulsarList:{}", taskId, JacksonUtil.toJSONString(pulsarBOS));} catch (IOException e) {log.error("websocket IO Exception");}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {try{TASK_CACHE.remove(this);// TODO 补充关闭连接的逻辑}catch (Exception e){log.error("web socket closed error taskId:{}", taskId, e);}}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {// 高并发情况下,使用websocket出现报错的问题synchronized (this.session){this.session.getBasicRemote().sendText(message);}}
}
前端代码
<script setup>
import { reactive, toRefs, onBeforeUnmount, onMounted, getCurrentInstance } from 'vue'
import { ElMessage, ElMessageBox } from 'element-plus'
const Env = import.meta.env.VITE_API_ENV
const { proxy } = getCurrentInstance()
const { $axios, $store } = proxylet ws = {}
let heartTime = null // 心跳定时器实例
let socketHeart = 0 // 心跳次数
let HeartTimeOut = 5000 // 心跳超时时间
let socketError = 0 // 错误次数
const _data = reactive({tableHeight: '488px',demandId: '',pointData: {},deviceId: '',userId: '',isContent: false,list: [],taskId: '',testData: {},isHandStop: false,testList: [],testAppKey: ''
})onMounted(() => {})const initWebSocket = taskId => {_data.isHandStop = falselet url = MakeWss(taskId)ws = new WebSocket(url)ws.onopen = function (e) {_data.isContent = trueconsole.log(e)}ws.onmessage = function (e) {console.log(e, e.data)if (e.data.indexOf('connect') == -1) {let dataList = JSON.parse(e.data)_data.list.push(dataList)changePointStatus(dataList)} else {resetHeart()}}ws.onerror = function (e) {console.log(e)reconnect()}ws.onclose = function (e) {console.log(e)_data.isContent = falseif (_data.isHandStop == false) {reconnect()}}
}// socket 重置心跳
const resetHeart = () => {socketHeart = 0socketError = 0clearInterval(heartTime)sendSocketHeart()
}// socket心跳发送
const sendSocketHeart = () => {heartTime = setInterval(() => {console.log('心跳发送:', socketHeart)ws.send(JSON.stringify({content: '',requestId: 'aa9872be-d5b9-478e-aba4-50527cd3ef32',type: 'heartbeat'}))socketHeart = socketHeart + 1}, HeartTimeOut)
}// socket重连
const reconnect = () => {if (socketError <= 2) {clearInterval(heartTime)initWebSocket(_data.taskId)socketError = socketError + 1console.log('socket重连', socketError)} else {console.log('重试次数已用完的逻辑', socketError)clearInterval(heartTime)}
}function stopTest() {_data.isHandStop = trueclearInterval(heartTime)ws.close()_data.isContent = false$axios.get(`/user/detail`, { demandId: _data.demandId, taskId: _data.taskId }).then(res => {if (res.success && res.data) {_data.pointData = res.data_data.testList = res.data.pointList}})
}
function clearTestList() {_data.list = []
}
function handleResize() {_data.tableHeight = document.documentElement.clientHeight - 230 + 'px'
}
window.addEventListener('resize', handleResize)
handleResize()onBeforeUnmount(() => {window.removeEventListener('resize', handleResize)
})
</script>