组件概述
面对C端产品,往往会携带有客户端和服务端的双端通信以实现实时交互的效果,但是目前HTTP1.1并不支持双端通信,因此,对于聊天室、多人实时游戏等场景,就需要用到一个新的通信协议:WebSocket。
更多WebSocket相关的信息请参考本文章https://blog.csdn.net/weixin_73077810/article/details/136840600?spm=1001.2014.3001.5501 而对于一个系统而言,往往WebSocket的建立使用不仅仅只有一个,那如果对于每一个业务都搭建一套适配的WebSocket组件,项目中便会加持上许多冗余代码,无法达到一个复用的效果,也不利于后续的相关业务拓展。
本文的目标是实现一个可以复用的WebSocket组件,通过设计模式解耦、业务标识分组实现项目中对于WebSocket通讯的复用实现,并基于业务标识扩展消息的自定义消费逻辑。
组件实现流程图
伪代码讲解
连接建立阶段
在WebSocket连接建立阶段,PlusWebSocketInterceptor拦截器首先在握手前将用户信息和业务标识存储到Session中,然后在握手后将session基于业务标识和用户ID存储到目标容器中。
/*** WebSocket握手请求的拦截器*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {/*** 握手前:将用户信息 + 业务标识在后续存储到Session中*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {LoginUser loginUser = LoginHelper.getLoginUser();attributes.put(LOGIN_USER_KEY, loginUser);String businessType = request.getHeaders().get(BUSINESS_TYPE_KEY).get(0);if (StrUtil.isBlank(businessType)){throw new ServiceException("WebSocket握手中Header需要携带业务标识businessType");}attributes.put(BUSINESS_TYPE_KEY, businessType);return true;}/*** 握手后*/@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {}
}
/*** 连接成功后:将session基于业务标识和用户ID存储到目标容器*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);String businessType = (String) session.getAttributes().get(BUSINESS_TYPE_KEY);WebSocketSessionHolder.addSession(loginUser.getUserId(), businessType, session);log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());}
// WebSocketSession存储容器
Map<String, Map<Long, WebSocketSession>> BUSINESS_TYPE_SESSION_MAP = new ConcurrentHashMap<>();
消息处理阶段
在消息处理阶段,首先从WebSocket会话中获取登录用户信息,然后创建WebSocket消息DTO对象并解析文本数据为两部分:具体信息和业务类型。接着检查数据是否有效,如果无效则记录错误并抛出异常。最后,将消息转发消息发布器对象。
/*** 处理发送来的文本消息*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 从WebSocket会话中获取登录用户信息LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);// 创建WebSocket消息DTO对象// 解析文本数据为两部分:具体信息 + 业务类型String payload = message.getPayload();
// payload = {"data":{"businessType":"xxx","data":{"type":xx,"contentUrl":"xxx","createTime":"xxx","friendId":"xxx","chatSessionId":"xxx","ossId":"xxx"}}}WebSocketMessageDto data = new JSONObject(payload).get("data", WebSocketMessageDto.class);if (data == null || data.getData() == null){log.error("WebSocket接收消息失败");throw new ServiceException("WebSocket接收消息失败");}// 记录要转发的用户列表 这里是转发给自己 + 通讯对象data.setSessionKeys(List.of(loginUser.getUserId(), data.getData().getFriendId()));WebSocketUtils.publishMessage(data);}
@Data
public class WebSocketMessageDto implements Serializable {/*** 需要推送到的session key 列表*/private List<Long> sessionKeys;/*** 传输的真实信息*/private WebSocketMutualDto data;/*** 业务类型*/private String businessType;
}
消息解析消费阶段
在消息解析消费阶段,首先根据业务类型获取对应的自定义消费器,然后执行该消费器的自定义消费逻辑。接着遍历需要发送消息的sessionKey列表,如果session存在则直接发送消息给目标对象。
public static void publishMessage(WebSocketMessageDto webSocketMessage) {// 执行业务的自定义消费逻辑String businessType = webSocketMessage.getBusinessType();// 获取该业务的指定消费器来执行自定义的消费逻辑MessageConsumer messageHandler = WebSocketConstants.CONSUMER_MAP.get(businessType);messageHandler.consumerMessage(webSocketMessage.getData());// 当前服务内session,直接发送消息for (Long sessionKey : webSocketMessage.getSessionKeys()) {if (WebSocketSessionHolder.existSession(sessionKey, businessType)) {WebSocketUtils.sendMessage(businessType, sessionKey, new JSONObject(webSocketMessage.getData()).toString());continue;}}}
public interface WebSocketConstants {/*** 基于业务标识调用业务消费器进行自定义逻辑*/Map<String, MessageConsumer> CONSUMER_MAP = new HashMap<>();
}
/*** 业务自定义消费器*/
public interface MessageConsumer {// 默认不进行自定义消费default void consumerMessage(WebSocketMutualDto message){};
}
/*** A项目WebSocket订阅消费者*/
@Component
@RequiredArgsConstructor
@Slf4j
public class MemDriftWebSocketConsumer implements MessageConsumer {private final String Drift_WebSocket_Type = "driftBottle";@PostConstructvoid init(){// 注册到WebSocket消费者组件池WebSocketConstants.CONSUMER_MAP.put(Drift_WebSocket_Type, this);}@Override@Transactional(rollbackFor = Exception.class)public void consumerMessage(WebSocketMutualDto message) {// 自定义消费逻辑}
}
后期业务扩展实现
如果有新的业务需要复用该组件,只需要继承MessageConsumer接口实现自己的业务消费逻辑,再者在修改消息中的业务标识便可无侵入复用WebSocket组件消费信息。
总结
文章首先指出了在多个业务中使用独立WebSocket组件会导致代码冗余和不利于维护拓展的问题。为此,提出了一个通用的WebSocket组件设计方案,允许不同业务共享相同的WebSocket基础架构,同时能够处理各自独特的业务逻辑。
组件的设计分为三个主要阶段:连接建立、消息处理、和消息解析消费。在连接建立阶段,PlusWebSocketInterceptor
拦截器会在握手前将用户信息和业务标识存储到会话(Session)中,并在握手后将会话按业务类型和用户ID组织存储。这一阶段确保了会话管理的有序和业务之间的隔离。
消息处理阶段涉及到接收和解析客户端发来的消息。服务器首先从会话中获取用户信息,然后解析文本消息内容,将其转换为WebSocketMessageDto
对象,这个对象包含了消息内容、接收者信息和业务类型等。之后,有效的消息将被转发到相应的处理器进行处理。