目录
概述
后端代码
拦截器
HandshakeInterceptor拦截器
ChannelInterceptor拦截器
消息转换器
配置类
消息处理
广播模式
用户模式
自定义请求头
用户信息
ChannelInterceptor拦截器设置用户信息
DefaultHandshakeHandler的继承类中设置用户对象
前端代码
概述
WebSocket 协议是一种相当低级的协议。它定义了如何将字节流转换为帧。帧可以包含文本或二进制消息。由于消息本身不提供有关如何路由或处理它的任何其他信息,因此很难在不编写其他代码的情况下实现更复杂的应用程序。幸运的是,WebSocket 规范允许在更高的应用程序级别上使用子协议。
STOMP : Simple Text Oriented Message Protocol——面向消息的简单文本协议
STOMP 提供了能够协作的报文格式,以至于 STOMP 客户端可以与任何 STOMP 消息代理(Brokers)进行通信,从而为多语言,多平台和 Brokers 集群提供简单且普遍的消息协作。STOMP 协议可以建立在WebSocket 之上,也可以建立在其他应用层协议之上。通过 Websocket建立 STOMP 连接,也就是说在 Websocket 连接的基础上再建立 STOMP 连接。
WebSocket 是底层协议,而 STOMP 是基于 WebSocket的上层协议。
后端代码
spring基于stomp协议的websocket实现主要是配置WebSocketMessageBrokerConfigurer相关信息。配置类需要加上注解@EnableWebSocketMessageBroker,表明这是一个websocket的处理broker。
WebSocketMessageBrokerConfigurer为我们提供了配置websocket端点、消息broker地址、拦截器、消息转换器的方法。
拦截器
拦截器主要是拦截客户端的握手消息的HandshakeInterceptor,以及拦截连接、订阅、消息发送、取消订阅、取消连接的ChannelInterceptor拦截器。
HandshakeInterceptor拦截器
HandshakeInterceptor是拦截客户端握手消息的拦截器,我们可以在这里对接口进行拦截过滤,如用户认证信息等。
/*** * @description: websocket握手拦截器,可以在这里获取到请求头信息进行拦截。*/
public class MyHandshakeInterceptor implements HandshakeInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(MyHandshakeInterceptor.class);@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {LOGGER.info("------------------MyHandshakeInterceptor:beforeHandshake");return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {LOGGER.info("-----------------MyHandshakeInterceptor:afterHandshake");}
}
ChannelInterceptor拦截器
ChannelInterceptor是对客户端的连接、订阅、消息发送、取消订阅、取消连接等消息进行拦截的实现。
在拦截器中我们可以对数据进行自定义转换,处理、解密等操作。
/*** * @description: WebSocket拦截器* 方法调用顺序:preSend -> postSend -> afterSendCompletion*/
public class MyWebsocketChannelInterceptor implements ChannelInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(MyWebsocketChannelInterceptor.class);@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);if (accessor.getCommand() == StompCommand.CONNECT) {LOGGER.info("------------收到websocket的连接消息");}if (accessor.getCommand() == StompCommand.SEND) {LOGGER.info("------------收到websocket的数据发送消息");}if (accessor.getCommand() == StompCommand.SUBSCRIBE) {LOGGER.info("------------收到websocket的订阅消息");}if (accessor.getCommand() == StompCommand.UNSUBSCRIBE) {LOGGER.info("------------收到websocket的取消订阅消息");}return message;}@Overridepublic void postSend(Message<?> message, MessageChannel channel, boolean sent) {LOGGER.info("------------WebsocketChannelInterceptor-postSend");}@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {LOGGER.info("-----------WebsocketChannelInterceptor-afterSendCompletion");}@Overridepublic boolean preReceive(MessageChannel channel) {LOGGER.info("----------WebsocketChannelInterceptor-preReceive");return true;}@Overridepublic Message<?> postReceive(Message<?> message, MessageChannel channel) {LOGGER.info("----------WebsocketChannelInterceptor-postReceive");return message;}@Overridepublic void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {LOGGER.info("----------WebsocketChannelInterceptor-afterReceiveCompletion");}
}
客户端发送的消息主要类型有参考StompCommand,主要有:
- CONNECT:启动与服务器的流或TCP 连接
- SEND:客户端发送消息
- SUBSCRIBE:客户端订阅主题
- UNSUBSCRIBE:客户端取消订阅
- BEGIN:启动事物
- COMMIT:提交事物
- ABORT:回滚事物
- ACK:确认来自订阅的消息的消费
- NACK:告诉服务器客户端没有消费该消息
- DISCONNECT:断开连接
- MESSAGE:于SEND一样。
这里需要注意的是,很多时候,前端有一些消息是没有定义在这里面的,例如心跳消HEARTBEAT,所以还有一个MessageType对象可以辅助判断。
public enum SimpMessageType {CONNECT,CONNECT_ACK,MESSAGE,SUBSCRIBE,UNSUBSCRIBE,HEARTBEAT,DISCONNECT,DISCONNECT_ACK,OTHER;}
消息转换器
消息转换器是对客户端发送过来的数据进行转换的类,通过消息转换器,可以将客户端的数据直接转换成对应的对象,并且将我们返回的消息处理成指定的格式。也可以在这里对数据进行加解密。等操作
通过实现MessageConverter的两个方法。
/*** * @description: 消息转换实体。*/
public class CommonMessageConvert implements MessageConverter {/*** 将客户端发送过来的消息转换为指定的对象* @param message 客户端发送过来的消息* @param targetClass 目标数据类型* @return 转换后的对象*/@Overridepublic Object fromMessage(Message<?> message, Class<?> targetClass) {if (message.getPayload() instanceof byte[]) {try {String textPayload = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);return JsonUtil.convertString2Object(textPayload,targetClass);} catch (Exception e) {throw new RuntimeException("Failed to convert websocket message", e);}}return null;}/*** 将服务器* @param payload the Object to convert* @param headers optional headers for the message (may be {@code null})* @return broker的消息实体*/@Overridepublic Message<?> toMessage(Object payload, MessageHeaders headers) {String str = JsonUtil.toJson(payload);byte[] bytes = str.getBytes(StandardCharsets.UTF_8);return new GenericMessage<>(bytes, headers);}
}
配置类
配置类WebSocketMessageBrokerConfigurer是对以上定义的各项进行配置的实现。
/*** @description: websocket配置类*/@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {/*** 注册 Stomp的端点 可以注册多个端点* addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址* withSockJS:指定端点使用SockJS协议**///定义 {context-path}/sjstmpwebsocket/* 接口为websocket接口registry.addEndpoint("/sjstmpwebsocket").addInterceptors(new MyHandshakeInterceptor())//.setHandshakeHandler(webSocketHandshakeHandler)//允许跨域访问.setAllowedOrigins("*").withSockJS();//定义 {context-path}/stmpwebsocket/* 接口为websocket接口registry.addEndpoint("/stmpwebsocket").addInterceptors(new MyHandshakeInterceptor())//.setHandshakeHandler(webSocketHandshakeHandler)//允许跨域访问.setAllowedOrigins("*");}@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {/***///config.enableStompBrokerRelay("/qeune")/** 配置消息代理* 客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送* 决定哪些目的地应该由简单代理(如内存中的队列或主题)处理。* 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker*/config.enableSimpleBroker("/topic", "/queue").setHeartbeatValue(new long[] {10000, 10000}).setTaskScheduler(new DefaultManagedTaskScheduler());/** 客户端名称前缀 将所有发往"/message"前缀的目的地的消息路由到应用层处理,* 以 /message 开头的STOMP消息被路由到 @Controller 类中的 @MessageMapping 和 @SubscribeMapping 方法。*/config.setApplicationDestinationPrefixes("/message");//服务端通知客户端的前缀,可以不设置,默认为userconfig.setUserDestinationPrefix("/user");}/*** 配置客户端入站通道拦截器* 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间** @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {/** 配置消息线程池* 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务* 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程* 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒*//*registration.taskExecutor().corePoolSize(10).maxPoolSize(20).keepAliveSeconds(60);*/// 拦截器配置registration.interceptors(new MyWebsocketChannelInterceptor());}/*** 消息转换器* @param messageConverters 转换器集合* @return 是否使用*/@Overridepublic boolean configureMessageConverters(List<MessageConverter> messageConverters) {messageConverters.add(new CommonMessageConvert());return true;}
}
消息处理
stomp的消息处理,主要是依赖注解实现,通过注解和配置类中的websocekt路径配置,决定了将客户端的websocket数据发送到哪个接口处理。
- @Controller:注解消息处理类需要
- @DestinationVariable:解析接口中的参数,类似于@PathParam
- @MessageMapping:类似于spring的@RequestMapping注解,表明了websocket的接口路径,接口也可以使用{}定义前端传过来的路径参数。可以用于类和方法
- @SubscribeMapping:订阅模式,只是在订阅的时候触发,可以理解为:访问—>返回数据。
- @SendTo:广播模式,将消息广播给所有监听这个主题的客户端。
- @SendToUser:用户模式,将消息发送给指定的用户。消息目的地有UserDestinationMessageHandler来处理,会将消息路由到发送者对应的目的地。默认该注解前缀为/user。如:用户订阅/user/hi ,在@SendToUser('/hi')查找目的地时,会将目的地的转化为/user/{name}/hi, 这个name就是principal的name值,该操作是认为用户登录并且授权认证,使用principal的name作为目的地标识。发给消息来源的那个用户。(就是谁请求给谁,不会发给所有用户,区分就是依照principal-name来区分的)。此外该注解还有个broadcast属性,表明是否广播。就是当有同一个用户登录多个session时,是否都能收到。取值true/false.
需要注意的是:@SendTo和@SendToUser中的路径配置必须是在配置类中broker配置的路径。参考MessageBrokerRegistry.enableSimpleBroker()
除此之外,spring还提供了另一种,基于 SimpMessagingTemplate的数据通知方式,和注解的方式相同,但是SimpMessagingTemplate可以实现点对点通信。
广播模式
广播模式,是将客户端发送的数据广播到指定的describetion地址,使得所有监听这个desctibetion的客户端都能接收到数据。实现方式主要有两种,一种是基于@SendTo注解,一种是基于SimpMessagingTemplate的convertAndSend()方法。
/*** @description: 处理websocket过来的数据*/
@Controller
public class MyStompEndpoint {private static final Logger LOGGER = LoggerFactory.getLogger(MyStompEndpoint.class);/*** 广播模式:* MessageMapping 指定要接收消息的地址,类似@RequestMapping。除了注解到方法上,也可以注解到类上* SendTo指定要发送到订阅地址, 如果没有写,则,默认消息将被发送到与传入消息相同的目的地* 消息的返回值是通过{@link org.springframework.messaging.converter.MessageConverter}进行转换* @param websocketMessage 请求参数* @return 自定义返回结果*/@MessageMapping("/stomp/sendMessage")@SendTo("/topic/targetSubscribe") public ResWebsocketMessage broadcast(WebsocketMessage websocketMessage){LOGGER.info("receive message = {}" , JsonUtil.toJson(websocketMessage));ResWebsocketMessage responseMessage = new ResWebsocketMessage();responseMessage.setType("BROADCAST");responseMessage.setMessage(websocketMessage.getMessage());return responseMessage;}
}
/*** @description: 处理websocket过来的数据*/
@Controller
public class MyStompEndpoint {private static final Logger LOGGER = LoggerFactory.getLogger(MyStompEndpoint.class);//spring提供的推送方式@Autowiredprivate SimpMessagingTemplate messagingTemplate;/*** 广播模式* @param requestMsg 请求消息*/@MessageMapping("/stomp/springBrocastMessage")public void springBrocastMessage(WebsocketMessage requestMsg) {//这里使用的是spring的security的认证体系,所以直接使用Principal获取用户信息即可。LOGGER.info("receive userMessage, message= {}" , JsonUtil.toJson(requestMsg));//发送到 /message/topic/targetSubscribe 的订阅客户端那里。messagingTemplate.convertAndSend("/topic/targetSubscribe", requestMsg.getMessage());}}
用户模式
用户模式是将客户端的消息发送给指定的一个或者多个用户。实现方式主要有两种,一种是基于@SendTouser注解,一种是基于SimpMessagingTemplate的convertAndSendToUser()方法。
这里需要注意的是@SendTouser发给的自己,但是convertAndSendToUser()可以选择需要发送的用户信息,实现真正意义上的点对点通信。两种的实现原理是相同的,都是基于请求中的用户信息Principal,着将在后面介绍。
/*** @author ZSC* @date 2024/5/21 - 9:07* @description: 处理websocket过来的数据*/
@Controller
public class MyStompEndpoint {private static final Logger LOGGER = LoggerFactory.getLogger(MyStompEndpoint.class);//spring提供的推送方式@Autowiredprivate SimpMessagingTemplate messagingTemplate;/*** 用户模式* @param requestMsg 请求消息* SendToUser 如果存在return,可以使用这种方式,路径必须是以broker指定的开始(MessageBrokerRegistry.enableSimpleBroker())* 只能发给数据发送的客户端 且 客户端需要监听 /user/queue/{userId} 才能收到;简单来说 自己 ---> broker --->自己*/@MessageMapping("/stomp/userMessage/{userId}")@SendToUser("/queue/{userId}")public ResWebsocketMessage userMessage(Principal principal, @DestinationVariable String userId, WebsocketMessage requestMsg) {//这里使用的是spring的security的认证体系,所以直接使用Principal获取用户信息即可。LOGGER.info("receive userMessage username: {} userId= {}, message= {}" , principal.getName(), userId, JsonUtil.toJson(requestMsg));ResWebsocketMessage responseMessage = new ResWebsocketMessage();responseMessage.setType("NOTI");responseMessage.setMessage(requestMsg.getMessage());return responseMessage;}
}
/*** @description: 处理websocket过来的数据*/
@Controller
public class MyStompEndpoint {private static final Logger LOGGER = LoggerFactory.getLogger(MyStompEndpoint.class);//spring提供的推送方式@Autowiredprivate SimpMessagingTemplate messagingTemplate;/*** 用户模式-发送给指定的订阅这着* @param requestMsg 请求消息*/@MessageMapping("/stomp/springUserMessage/{userId}")public void springUserMessage(@DestinationVariable String userId, WebsocketMessage requestMsg) {//这里使用的是spring的security的认证体系,所以直接使用Principal获取用户信息即可。LOGGER.info("receive springUserMessage, message:{}" , userId, JsonUtil.toJson(requestMsg));/** convertAndSendToUser 会默认在主题前添加 /user 前缀, 所以客户端需要在订阅路径前加入/user,* 这里会自动发送到 指定订阅的路径 : /user/queue/targetUser, 其中 user是在 setUserDestinationPrefix中配置的, * 所以客户端需要监听/user/queue/targetUser地址才能收到消息*/messagingTemplate.convertAndSendToUser(userId, "/queue/targetUser", requestMsg.getMessage());}
}
自定义请求头
既然是数据交互,不能避免的就是用户认证,只有通过认证的用户我们才能进行后续的数据交互。
在本章介绍的websocket实现中,支持读取请求头的方式有DefaultHandshakeHandler、HandshakeInterceptor、ChannelInterceptor。
理论上,DefaultHandshakeHandler、HandshakeInterceptor、ChannelInterceptor都能获取到客户端的传过来的请求头信息。但是这严重依赖于客户端的实现方式,如果客户端能可以设置自定义请求头,后端都能获取。
但是我尝试过几种客户端的(前端js)的websockt实现方式,包括原生的、@stomp/stompjs,都没办法在DefaultHandshakeHandler和HandshakeInterceptor拦截器中获取到自定义请求头信息。只能是从固定的请求头Sec-WebSocket-Protocol中获取。
基于stomp-client+webscoket的方式可以设置自定义的请求头,并在DefaultHandshakeHandler 和 HandshakeInterceptor拦截器中获取到。
注意:这个用法只适合用在客户端使用原生的websocket时,如果是基于stomp的,那么这个Sec-WebSocket-Protocol请求头已经有了stomp设置的值。
但是我们可以在这里获取websocket定义好请求头,如登录的用户名login和密码passcode。如果是基于这两个做验证则可以。
public class MyHandshakeInterceptor implements HandshakeInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(MyHandshakeInterceptor.class);@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {HttpHeaders headers = request.getHeaders();/** 前端很多websocket实现方式不支持的自定义请求头信息只能放到 Sec-WebSocket-Protocol 这里面,* websocket不支持自定义请求头信息*//*List<String> list = headers.get("Sec-WebSocket-Protocol".toLowerCase());if(CollectionUtils.isNotEmpty(list)) {//做校验等等}*/LOGGER.info("------------------MyHandshakeInterceptor:beforeHandshake");return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {LOGGER.info("-----------------MyHandshakeInterceptor:afterHandshake");}
}
ChannelInterceptor能获取到@stomp/stompjs、基于stomp-client+webscoket等方式设置的请求头。只是需要通过Message对象获取。
/*** @description: stomp/stompjs + websocket时的拦截器 WebSocket拦截器* 方法调用顺序:preSend -> postSend -> afterSendCompletion*/
public class StompJsWebsocketChannelInterceptor implements ChannelInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(StompJsWebsocketChannelInterceptor.class);@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);LOGGER.info("收到websocket的消息:command:{}, ack:{}", accessor.getCommand(), accessor.getAck());if (accessor.getCommand() == StompCommand.CONNECT) {/** 这里一般都需要保证 messageAccessor.getMessageType() == SimpMessageType.CONNECT* 但是测试时发现,这两者总是一致的,所以就不判断了。**/StompHeaderAccessor messageAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);// 从Header中可以读取login和passcodeObject raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object userInfo = ((Map) raw).get("userId");if (userInfo instanceof LinkedList) {// 设置当前访问器的认证用户String name = ((LinkedList<?>)userInfo).get(0).toString();......}}}return message;}
}
用户信息
在本章的实现方法中,用户信息就是secrity的Principal对象或者实现了Principal方法的对象。
后续所有获取用户的方法所使用的也是这个对象。
实现设置Principal的方式有两种,一种是在ChannelInterceptor拦截器的实现类中进行设置,还有一种是在DefaultHandshakeHandler的继承类中实现。DefaultHandshakeHandler是握手处理器,其方法determineUser()放回的就是这个用户对象。
ChannelInterceptor拦截器设置用户信息
官网地址
ChannelInterceptor拦截器设置用户信息,实现如下:
/*** @description: stomp/stompjs + websocket时的拦截器 WebSocket拦截器* 方法调用顺序:preSend -> postSend -> afterSendCompletion*/
public class StompJsWebsocketChannelInterceptor implements ChannelInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(StompJsWebsocketChannelInterceptor.class);@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);LOGGER.info("收到websocket的消息:command:{}, ack:{}", accessor.getCommand(), accessor.getAck());if (accessor.getCommand() == StompCommand.CONNECT) {/** 这里一般都需要保证 messageAccessor.getMessageType() == SimpMessageType.CONNECT* 但是测试时发现,这两者总是一致的,所以就不判断了。**/StompHeaderAccessor messageAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);// 从Header中可以读取login和passcodeObject raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object userInfo = ((Map) raw).get("userId");if (userInfo instanceof LinkedList) {// 设置当前访问器的认证用户String name = ((LinkedList<?>)userInfo).get(0).toString();/** 这里必须要设置在 MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 得到的结果里,否则子获取用户的时候会报错*/messageAccessor.setUser(new StompPrincipal(name));}}}return message;}
}
需要注意的是,在获取StompHeaderAccessor对象的方式有两种:
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompHeaderAccessor messageAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
但是用户的信息的设置必须是要设置到MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class)的对象中。否则在获取该对象的时候就会出现错误。
org.springframework.messaging.simp.annotation.support.MissingSessionUserException: No "user" header in message
DefaultHandshakeHandler的继承类中设置用户对象
DefaultHandshakeHandler的继承类中生成用户对象的实现如下:
/*** @description: 握手处理器,为每一个用户生成一个*/
public class CustomHandshakeHandler extends DefaultHandshakeHandler {private static final Logger LOGGER = LoggerFactory.getLogger(CustomHandshakeHandler.class);@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {LOGGER.info("------------------CustomHandshakeHandler:determineUser");HttpHeaders headers = request.getHeaders();List<String> userIds = headers.get("userId");if(CollectionUtils.isNotEmpty(userIds)) {return new StompPrincipal(userIds.get(0));}return request.getPrincipal();}
}
public class StompPrincipal implements Principal {String name;public StompPrincipal(String name) {this.name = name;}@Overridepublic String getName() {return name;}
}
该自定义握手处理器需要在配置项中加入。
@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {/*** 注册 Stomp的端点 可以注册多个端点**///定义 {context-path}/stmpwebsocket/* 接口为websocket接口registry.addEndpoint("/stmpwebsocket").addInterceptors(new MyHandshakeInterceptor())//自定义握手处理器.setHandshakeHandler(new CustomHandshakeHandler())//允许跨域访问.setAllowedOrigins("*");}
}
但是正如前面所说的 DefaultHandshakeHandler是否能获取到前端自定义的请求头信息,完全依赖于前端使用何种方式来实现。
使用Principal用户对象
经过以上设置用户对象之后,在代码中我们就可以直接获取Principal对象了。
@MessageMapping("/stomp/userMessage/{userId}")@SendToUser("/queue/{userId}")public ResWebsocketMessage userMessage(Principal principal, @DestinationVariable String userId, WebsocketMessage requestMsg) {//这里使用的是spring的security的认证体系,所以直接使用Principal获取用户信息即可。LOGGER.info("receive userMessage username: {} userId= {}, message= {}" , principal.getName(), userId, JsonUtil.toJson(requestMsg));ResWebsocketMessage responseMessage = new ResWebsocketMessage();responseMessage.setType("NOTI");responseMessage.setMessage(requestMsg.getMessage());return responseMessage;}/*** 用户模式-发送给指定的订阅这着* @param requestMsg 请求消息*/@MessageMapping("/stomp/springUserMessage/{userId}")public void springUserMessage(Principal principal, @DestinationVariable String userId, WebsocketMessage requestMsg) {//这里使用的是spring的security的认证体系,所以直接使用Principal获取用户信息即可。LOGGER.info("receive springUserMessage, userName:{} userId:{}, message:{}" , principal.getName(), userId, JsonUtil.toJson(requestMsg));/** convertAndSendToUser 会默认在主题前添加 /user 前缀, 所以客户端需要在订阅路径前加入/user,* 这里会自动发送到 指定订阅的路径 : /user/queue/targetUser, 其中 user是在 setUserDestinationPrefix中配置的* 所以客户端需要监听/user/queue/targetUser地址才能收到消息*/messagingTemplate.convertAndSendToUser(userId, "/queue/targetUser", requestMsg.getMessage());}
前端代码
本文讨论的实现都是基于vue开发的项目中测试,其中websocket的实现是利用工具@stomp/stompjs。其他的实现方式,暂时没有试过。具体前端的代码请参考另外介绍文章。