即时通讯代码优化

server/2024/10/22 1:40:54/

在线用户逻辑修复

在进行测试时,发现当前代码有个问题,如果test1在服务器进行连接,本地的test2给test1发消息,虽然test1能收到服务器上的信息,但是本地服务日志中会报teset1不在线,需要对该种情况进行修复

修复方案:使用redis存储在线用户

  • WebSocketEndpoint

利用setBit记录登录用户,key为用户名的hashcode,即便有可能冲突,但是概率较小,可以接受。

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}
  • DefaultSendExecutor

适配改变类型后的WEB_SOCKET_ENDPOINT_MAP,并调整代码结构

package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** @author PC*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();/*** redis中用户SessionMap*/public static final String ONLINE_USER = "cus:ws:online-user";private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService = webSocketMessageService;}@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {WebSocketEndpoint.redisTemplate = redisTemplate;}/*** 打开ws连接** @param session 会话*/@OnOpenpublic void onOpen(Session session) {//连接成功String userName = getUserName(session);logger.info("The connection is successful:" + getUserName(session));this.session = session;//是有hash冲突的可能性的,不过触发概率很低,可以忽略int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** @param session 会话*/@OnClosepublic void onClose(Session session) {String userName = getUserName(session);int hashCode = userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info("Disconnect:" + userName);}/*** 接收到的消息** @param message 消息内容*/@OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName = getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>()).stream().findFirst().orElse("anonymous_users");}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}
}

重复代码提取

在增加了stream和redis渠道后,消息监听处的代码有大段重复,可以进行提取处理

  • MessageInfoUtil
package com.example.im.infra.executor.send.util;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @author PC* MessageInfo工具类*/
@Component
public class MessageInfoUtil {private final static Logger logger = LoggerFactory.getLogger(MessageInfoUtil.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {String messageJson = new String(messageInfoByte, StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}
}
  • RedisMessageListener
package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;/*** @author PC* redis监听*/
@Component
public class RedisMessageListener implements MessageListener {private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Overridepublic void onMessage(Message message, byte[] pattern) {logger.debug("send redis info");messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());}
}
  • StreamMessageListener
package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private MessageInfoUtil messageInfoUtil;@Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil = messageInfoUtil;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {logger.debug("send stream info");return messageInfoFlux -> messageInfoFlux.map(message -> messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();}
}

未用到的bean不加载

当未用到某些渠道时,无需进行相关配置,如使用了redis渠道,yml中就无需配置kafka信息

Redis渠道

调整config文件,当渠道非redis时对redis渠道相关bean不进行加载

  • WebSocketConfig
package com.example.im.config;import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.annotation.Resource;/*** @author PC*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Resourceprivate WebSocketProperties webSocketProperties;@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}/**** 配置线程池* @return 线程池*/@Beanpublic TaskExecutor taskExecutor() {WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(executorProperties.getCorePoolSize());// 设置最大线程数executor.setMaxPoolSize(executorProperties.getMaxPoolSize());// 设置队列容量executor.setQueueCapacity(executorProperties.getQueueCapacity());// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());// 设置默认线程名称executor.setThreadNamePrefix("im-");// 设置拒绝策略executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}
  • RedisConfiguration

缓存用到了redis,RedisTemplate需要加载

package com.example.im.config;import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author PC*/
@Configuration
public class RedisConfiguration {/*** redisTemplate配置** @return RedisTemplate*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setStringSerializer(stringRedisSerializer);redisTemplate.setDefaultSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅一个或多个频道container.addMessageListener(listenerAdapter, new PatternTopic("redis-websocket-*"));return container;}@Bean@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {return new MessageListenerAdapter(redisMessageListener);}
}
  • RedisMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisMessageListener implements MessageListener 
  • RedisSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisSendExecutor extends AbstractBaseSendExecutor 

Kafka渠道

  • StreamMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamMessageListener 
  • StreamSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamSendExecutor extends AbstractBaseSendExecutor

参考资料

[1].im项目


http://www.ppmy.cn/server/133767.html

相关文章

efinance库支持哪些类型的金融数据获取?

炒股自动化&#xff1a;申请官方API接口&#xff0c;散户也可以 python炒股自动化&#xff08;0&#xff09;&#xff0c;申请券商API接口 python炒股自动化&#xff08;1&#xff09;&#xff0c;量化交易接口区别 Python炒股自动化&#xff08;2&#xff09;&#xff1a;获取…

「从零开始的 Vue 3 系列」:第十三章——架构一个Vue项目(简单版)

前言 本系列将从零开始&#xff0c;系统性地介绍 Vue 3 的常用 API&#xff0c;逐步深入每个核心概念与功能模块。通过详尽的讲解与实战演示&#xff0c;帮助大家掌握 Vue 3 的基础与进阶知识&#xff0c;最终具备独立搭建完整 Vue 3 项目的能力。 从零开始使用 Vite 和 Vue 3…

Elasticsearch是做什么的?

初识elasticsearch 官方网站&#xff1a;Elasticsearch&#xff1a;官方分布式搜索和分析引擎 | Elastic Elasticsearch是做什么的&#xff1f; Elasticsearch 是一个分布式搜索和分析引擎&#xff0c;专门用于处理大规模数据的实时搜索、分析和存储。它基于 Apache Lucene …

【MySQL】索引的机制、使用

在学习索引知识之前&#xff0c;我们可以先了解一下什么是索引。实际上&#xff0c;索引就是数据库中一个或多个列存储的结构&#xff0c;能够支持数据库管理系统在不扫描整张表的情况下也能查询到数据行&#xff0c;能够大大提升查询效率。举个例子&#xff0c;我们想要找到一…

暴露在公网的linux服务器提高安全性的几种方式

目录 一、提高linux服务器安全性的原因 1、数据保护 2、业务连续性 3、防止恶意软件 4、防止内部威胁 5、应对新威胁 二、具体操作 1、禁止ssh的root远程访问 2、修改root访问密码 3、修改ssh访问端口 4、修改数据库默认端口 5、禁止数据库root远程访问 6、修改防火墙策略 三…

Android——发送彩信

跳转到相册选择图片 btn_jump.setOnClickListener(new View.OnClickListener() {Overridepublic void onClick(View view) {// 跳转到系统相册选择图片并返回Intent intent new Intent(Intent.ACTION_GET_CONTENT);// 设置图片类型为图片类型intent.setType("image/*&quo…

【c++ arx 选项板2】

static void xlArx_gmenu2(void) {//007 创建面板集if (!g_pMyPaletteSet){// 创建palette setg_pMyPaletteSet = new CMyPaletteSet

01 视频捕获

1.视频捕获 视频捕获&#xff0c;作为数字视频处理的首要步骤&#xff0c;涉及到将模拟视频信号转换为数字数据流&#xff0c;进而在计算机或其他数字设备上进行处理、存储和传输。本章节将深入探讨视频捕获的基本概念、流程&#xff0c;以及空间采样、时序采样和帧场转换等关…