Spring boot 项目作为客户端调用 服务端websocket

devtools/2024/9/19 0:49:42/ 标签: spring boot, websocket, 后端

文章目录

    • java客户端请求websocket
      • Spring boot 导入包
      • 客户端调用方法
        • 测试执行方法
        • connectWebSocket
        • HandshakeMessage
        • sendHandshake
        • WebSocketConfig.queue.take
        • 方法对应实体类
        • 配置 yaml 资源
        • WebSocketConfig 配置类
            • 注入配置websocketUrl:
            • LinkedBlockingQueue
            • LinkedBlockingQueue的特点
            • connectWebSocket 连接
        • URI
        • WebSocketClientHandler
        • connectBlocking方法
            • connectBlocking方法有两个参数:
            • sendMessage
            • close
        • WebSocketClientHandler 配置类
            • onOpen 方法
            • onMessage 方法
            • onClose 方法
            • onError 方法
    • java服务端websocket
            • WebSocketConfig 配置类
            • WebSocketHandler 监听类

websocket_1">java客户端请求websocket

Spring boot 导入包

pom.xml 导入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

客户端调用方法

测试执行方法
  • connectWebSocket
    • 连接websocket 客户端,并携带过期时间等参数
  • HandshakeMessage
    • 发送消息对象
  • sendHandshake
    • 发送消息
  • WebSocketConfig.queue.take
    • 队列信息,等待数据返回,并消费,获取websocket 返回的消息数据
public String test() {try {WebSocketConfig.connectWebSocket();HandshakeMessage handshakeMessage = new HandshakeMessage();handshakeMessage.setMessage("test");handshakeMessage.setClientId(UUID.randomUUID().toString());handshakeMessage.setType("handshake");WebSocketConfig.sendHandshake(handshakeMessage);String take = WebSocketConfig.queue.take();System.out.println("test:" + take);WebSocketConfig.close();}catch (InterruptedException ex){System.out.println("连接异常"+ ex.getMessage());}return null;
}
方法对应实体类
public class HandshakeMessage {private String type;private String clientId;private String message;public String getType() {return type;}public void setType(String type) {this.type = type;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}}
配置 yaml 资源
websocket:url: ws://localhost:8080/websocket
WebSocketConfig 配置类
websocketUrl_81">注入配置websocketUrl:

使用@Value注解将websocket.url注入到类的私有成员变量websocketUrl中。确保在类初始化时就设置好静态变量。

LinkedBlockingQueue

LinkedBlockingQueue是Java并发集合框架中的一种线程安全的队列实现,它继承自BlockingQueue接口。LinkedBlockingQueue使用链表结构来存储元素,并且提供了阻塞操作,可以在队列为空或满时自动阻塞生产者或消费者线程,直到队列变为非空或非满。

  • LinkedBlockingQueue的特点
    • 线程安全性:LinkedBlockingQueue是线程安全的,可以在多线程环境中安全地使用。
    • 阻塞操作:提供了put和take等阻塞方法,当队列满时调用put会阻塞,当队列为空时调用take会阻塞。
    • 容量可配置:LinkedBlockingQueue可以被初始化为一个固定容量的队列,也可以是一个无界队列(默认情况下,如果未指定容量,则容量为Integer.MAX_VALUE)。
connectWebSocket 连接
  • URI
    • URI类可以帮助你处理和解析Web地址,并确保这些地址格式正确。
  • WebSocketClientHandler
    • 继承 WebSocketClient 类 ,实现一些 websocket 方法重写
  • connectBlocking方法
    • connectBlocking方法用于建立WebSocket连接,并且在连接建立之前会阻塞当前线程。这通常用于确保连接完全建立后再继续执行后续操作。connectBlocking方法是org.java_websocket.client.WebSocketClient的一个扩展方法,它允许开发者在连接建立之前等待一段时间。
    • connectBlocking方法有两个参数:
      • timeout:指定连接建立的最大等待时间。
      • unit:指定时间单位(如毫秒、秒、分钟等)。
sendMessage

sendMessage方法用于向WebSocket服务器发送文本消息
send 方法

  • 检查WebSocket连接状态:
    • 如果WebSocket连接尚未打开(!this.isOpen()),则抛出WebsocketNotConnectedException异常。这是因为只有在连接建立后才能发送数据。
  • 检查参数有效性:
    • 如果传入的frames参数为null,则抛出IllegalArgumentException异常。这是为了确保传入的数据是有效的。
  • 准备发送的数据帧:
    • 创建一个新的ArrayList来存储即将发送的二进制帧。
    • 遍历frames集合中的每一个Framedata对象。
    • 对于每一个Framedata对象,调用draft.createBinaryFrame(f)方法将其转换为ByteBuffer,然后添加到outgoingFrames列表中。
    • 在遍历过程中,通过日志记录每一步操作的信息(如果启用了日志的trace级别)。
  • 发送数据帧:
    • 最后,调用write方法,将准备好的outgoingFrames列表作为参数传递进去,完成实际的数据发送操作。
    public void send(String text) {if (text == null) {throw new IllegalArgumentException("Cannot send 'null' data to a WebSocketImpl.");} else {this.send((Collection)this.draft.createFrames(text, this.role == Role.CLIENT));}
    }
    private void send(Collection<Framedata> frames) {if (!this.isOpen()) {throw new WebsocketNotConnectedException();} else if (frames == null) {throw new IllegalArgumentException();} else {ArrayList<ByteBuffer> outgoingFrames = new ArrayList();Iterator var3 = frames.iterator();while(var3.hasNext()) {Framedata f = (Framedata)var3.next();this.log.trace("send frame: {}", f);outgoingFrames.add(this.draft.createBinaryFrame(f));}this.write((List)outgoingFrames);}
    }
    
close

关闭连接

package com.dog.websocket;import com.alibaba.fastjson2.JSONObject;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;@Component
public class WebSocketConfig {private static String websocketUrl;public static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);@Value("${websocket.url}")public void setWebsocketUrl(String websocketUrl){WebSocketConfig.websocketUrl = websocketUrl;}private static WebSocketClient client;public static void connectWebSocket(){try{URI uri = new URI(websocketUrl);client = new WebSocketClientHandler(uri);client.connectBlocking(4000, TimeUnit.MINUTES);}catch (URISyntaxException|InterruptedException ex){ex.printStackTrace();throw new RuntimeException("websocket 连接异常");}}/*** 直接发送信息* @param sendMessage*/public static void sendMessage(String sendMessage){client.send(sendMessage);}public static void sendHandshake(HandshakeMessage handshakeMessage){String sendMessage = JSONObject.toJSONString(handshakeMessage);System.out.println(sendMessage);client.send(sendMessage);}public void sendByteMessage(byte[] bytes){client.send(bytes);}/*** 连接关闭*/public static void close(){if (client != null && client.isOpen()) {client.close();}}
}
WebSocketClientHandler 配置类

继承WebSocketClient 并重写了几个关键的方法来处理WebSocket连接的不同生命周期事件

onOpen 方法
 @Override
public void onOpen(ServerHandshake serverHandshake) {System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());
}

当WebSocket连接成功建立时,这个方法会被调用。它打印出连接的状态码。

onMessage 方法
 @Override
public void onMessage(String s) {System.out.println("message: "+ s);try {// 尝试在一定时间内将消息放入队列if (!queue.offer(s, 10, TimeUnit.SECONDS)) {System.err.println("无法在规定时间内将消息放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}
}

当从WebSocket服务器接收到消息时,这个方法会被调用。它首先打印接收到的消息,然后尝试将消息放入WebSocketConfig.queue队列中。如果在向队列中添加消息时发生中断异常,则恢复中断状态并打印错误信息。

onClose 方法
 @Override
public void onClose(int i, String s, boolean b) {System.out.println("WebSocket连接已关闭: " + s);
}

当WebSocket连接关闭时,这个方法会被调用。它打印出关闭连接的原因。

onError 方法
@Override
public void onError(Exception ex) {ex.printStackTrace();System.err.println("WebSocket发生错误: " + ex.getMessage());
}

当WebSocket连接发生错误时,这个方法会被调用。它打印出错误信息及其堆栈跟踪。

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Component;import java.net.URI;
import java.util.concurrent.TimeUnit;import static com.dog.websocket.WebSocketConfig.queue;public class WebSocketClientHandler extends WebSocketClient {public WebSocketClientHandler(URI serverUri) {super(serverUri);}@Overridepublic void onOpen(ServerHandshake serverHandshake) {System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());}@Overridepublic void onMessage(String s) {System.out.println("message: "+ s);try {// 尝试在一定时间内将消息放入队列if (!queue.offer(s, 10, TimeUnit.SECONDS)) {System.err.println("无法在规定时间内将消息放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}}@Overridepublic void onClose(int i, String s, boolean b) {System.out.println("WebSocket连接已关闭: " + s);}@Overridepublic void onError(Exception ex) {ex.printStackTrace();System.err.println("WebSocket发生错误: " + ex.getMessage());}
}

上面只是根据所需要自行调整

websocket_308">java服务端websocket

在上一篇博客已做详细简绍,不做补充
spring boot 项目 跟 JavaScript 简单 websocket 使用

WebSocketConfig 配置类
package com.ruoyi.common.utils.socket;import com.ruoyi.common.utils.socket.handler.WebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {private final WebSocketHandler webSocketHandler;public WebSocketConfig(WebSocketHandler webSocketHandler) {this.webSocketHandler = webSocketHandler;}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(webSocketHandler, "/websocket").setAllowedOrigins("*");}
}
WebSocketHandler 监听类
package com.ruoyi.common.utils.socket.handler;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.utils.socket.HandshakeMessage;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {private static final Map<String, WebSocketSession> clientSessions = new ConcurrentHashMap<>();private static final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {super.afterConnectionEstablished(session);String sessionId = session.getId();System.out.println("WebSocket connection established with session ID: " + sessionId);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();HandshakeMessage handshakeMessage = objectMapper.readValue(payload, HandshakeMessage.class);if ("handshake".equals(handshakeMessage.getType())) {String clientId = handshakeMessage.getClientId();String sessionId = session.getId();// 存储clientId与sessionId的映射关系clientSessions.put(clientId, session);handshakeMessage.setMessage("success");// 可以选择回复客户端确认握手成功的消息session.sendMessage(new TextMessage(JSON.toJSONString(handshakeMessage)));}}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {super.afterConnectionClosed(session, status);String sessionId = session.getId();System.out.println("WebSocket connection closed with session ID: " + sessionId);// 移除会话clientSessions.values().removeIf(s -> s.getId().equals(sessionId));}public void sendMessageToClient(String clientId, String message) {WebSocketSession session = clientSessions.get(clientId);if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(message));} catch (Exception e) {e.printStackTrace();}}}
}

http://www.ppmy.cn/devtools/110476.html

相关文章

基于飞桨paddle2.6.1+cuda11.7+paddleRS开发版的目标提取-道路数据集训练和预测代码

基于飞桨paddle2.6.1cuda11.7paddleRS开发版的目标提取-道路数据集训练和预测代码 预测结果&#xff1a; 预测影像&#xff1a; &#xff08;一&#xff09;准备道路数据集 下载数据集地址&#xff1a; https://aistudio.baidu.com/datasetdetail/56961 mass_road.zip …

Cursor:程序员的AI助手,开启智能编程新时代

在当今快节奏的软件开发世界&#xff0c;效率和准确性是成功的关键。而 Cursor&#xff0c;作为一款创新的人工智能编程工具&#xff0c;正在极大地改变着编程的面貌&#xff0c;为开发者带来前所未有的便捷与惊喜。 智能代码生成 Cursor 利用强大的人工智能模型&#xff0c;…

[AHK]Listbox with incremental search

可以根据文本框中的输入内容&#xff0c;实时动态从列表中搜索并定位所搜索内容。 AHK V1代码 #Requires AutoHotkey v1.0 Gui Add, Edit, w300 h20 vsearchedString gIncrementalSearch Gui Add, ListBox, vchoice gListBoxClick w300 h250 hscroll vscroll Gui Add, Button, …

TCP通信实现

目录 前言 一、实现TCP通信 二、通信原理 &#xff08;网路传输的封包与拆包&#xff09; 三、通信过程中的头 1.MAC帧 2. IP头 3.TCP头 4.UDP头 总结 前言 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一种面向连接…

【Git】本地仓库操作

Part1 基础概念 git作用&#xff1a;管理代码版本&#xff0c;记录&#xff0c;切换&#xff0c;合并代码 git仓库&#xff1a;记录文件状态内容和历史记录的地方&#xff08;.git文件夹&#xff09; git的三个区域&#xff1a;1&#xff09;工作区&#xff1a;实际开发时的文…

ts复合流讲解

一、什么是复合流 复合流指的是一条音视频数据流中同时包含了音频ES和视频ES数据&#xff08;ES指的是从编码器出来的音视频裸流比如H264&#xff0c;AAC&#xff09;。在音视频开发中最常见的复合流一般是TS、MP4、flv等。TS和flv一般用于网络传输&#xff0c;MP4一般用于本地…

理解 RabbitMQ:生产者、连接、通道、交换机、队列与消费者的消息流

在分布式消息系统中&#xff0c;RabbitMQ 是一个非常流行的消息代理。它的核心理念是解耦应用程序的生产者和消费者&#xff0c;使得消息能够可靠地从一方传递到另一方。本文将带你深入了解 RabbitMQ 中 生产者、连接、通道、交换机、队列 和 消费者 之间的消息流&#xff0c;并…

MySQL中DML操作(三)

更新数据&#xff08;UPDATE&#xff09; UPDATE 表名 SET 列名值, 列名值 WHERE 条件; 注意&#xff1a; 更新语句中一定要给定更新条件&#xff0c;否则表中所有数据都会被更新。 示例&#xff1a; 更新emp3表中的id为1的数据&#xff0c;添加address为Beijing。 updat…

windows中多ping网络ICMP

之前没搞过ICMP,第一次弄&#xff0c;遇到好多坑&#xff0c;其中在接收ICMP消息时无法指定ip这个坑困扰了好久&#xff0c;最后在网上找到一种解决方法&#xff1b;直接看效果吧&#xff01;&#xff01; 其中我获取ip状态直接扔到线程池里面处理的 struct DevicePingMsg {D…

Http中get与post的区别,99%的人都理解错了吧

Get和Post是HTTP请求的两种基本方法&#xff0c;要说它们的区别&#xff0c;接触过WEB开发的人都能说出一二。 最直观的区别就是Get把参数包含在URL中&#xff0c;Post通过request body传递参数。 你可能自己写过无数个Get和Post请求&#xff0c;或者已经看过很多权威网站总结…

Python 中常见的数据结构(一)

Python 中常见的数据结构&#xff08;一&#xff09; Python 是一种功能强大且灵活的编程语言&#xff0c;它提供了多种内置的数据结构&#xff0c;可以帮助我们更好地组织和处理数据。在这个文章中&#xff0c;我们将探讨 Python 中最常见的一些数据结构&#xff0c;并结合实…

软件测试学习笔记丨Pytest的使用

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/22158 1. 简介 pytest是一个成熟的全功能python测试框架测试用例的skip和xfail&#xff0c;自动失败重试等处理能够支持简单的单元测试和复杂的功能测试&#xff0c;还可以用来做selenium/ap…

Apache DataFusion查询引擎简介

01 简介 DataFusion是一个查询引擎&#xff0c;其本身不具备存储数据的能力。正因为不依赖底层存储的格式&#xff0c;使其成为了一个灵活可扩展的查询引擎。它原生支持了查询CSV&#xff0c;Parquet&#xff0c;Avro&#xff0c;Json等存储格式&#xff0c;也支持了本地&#…

C++:构造函数、析构函数

目录 一、类的默认成员函数 二、构造函数 构造函数的特点 三、析构函数 析构函数的特点 一、类的默认成员函数 默认成员函数就是用户没有显式实现&#xff0c;编译器会自动生成的成员函数称为默认成员函数&#xff0c;一个类&#xff0c;我们不写的情况下编译器会默认生成…

基于多种智能优化算法优化BP神经网络的数据时序预测

基于多种智能优化算法优化BP神经网络进行数据时序预测的研究&#xff0c;旨在通过引入多种优化算法来提高传统BP神经网络&#xff08;Backpropagation Neural Network&#xff09;的预测精度与泛化能力。 代码原理及流程 1. BP神经网络简介 BP神经网络是一种常见的前馈神经网…

一维稳态与非稳态导热的详细分析

目录 引言 一维稳态导热 应用实例&#xff1a;单层平壁导热 数值求解&#xff1a; 一维非稳态导热 应用实例&#xff1a;单层平壁的非稳态导热 温度变化阶段 表格总结&#xff1a; 引言 热传导&#xff08;Heat Conduction&#xff09;是热量在物体内部通过微观粒子的相…

以一种访问权限不允许的方式做了一个访问套接字的尝试

System.Net.Sockets.SocketException: 以一种访问权限不允许的方式做了一个访问套接字的尝试. 近来做的一个net core的网页&#xff0c;突然有这样的一个提示。上网查询之后&#xff0c;有二种可能&#xff0c;1&#xff0c;管理员角色运行VS2022后重新编译一下项目。2&#x…

【编程入门】与7无关的数?

题目描述 一个整数&#xff0c;如果这个数能够被7整除&#xff0c;或者其中有一位是7&#xff0c;我们称为这个数是与7有关的数。比如&#xff1a;14能被7整除&#xff0c;17有一位为7&#xff0c;这两个数都是与7有关的数。 请你编程求出1~n&#xff08;n<999&#xff09;…

【Unity面经】性能优化篇

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 专栏交流&#x1f9e7;&…

浅谈NODE的NPM命令和合约测试开发工具HARDHAT

$ npm install yarn -g # 将模块yarn全局安装 $ npm install moduleName # 安装模块到项目目录下 默认跟加参数 --save 一样 会在package文件的dependencies节点写入依赖。 $ npm install -g moduleName # -g 的意思是将模块安装到全局&#xff0c;具体安装到磁盘哪个位置&…