WebSocketSession无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。但是,我们可以使用redis的发布订阅模式解决。
1、WebSocketHandler 消息处理器
import com.alibaba.fastjson.JSON;
import com.techhf.common.core.constant.SecurityConstants;
import com.techhf.common.core.exception.BusinessServiceException;
import com.techhf.im.api.model.vo.ChatPageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** websocket 消息处理器** @author ronshi* @date 2022/11/18 9:17*/
@Slf4j
@Component
public class WebSocketHandler extends TextWebSocketHandler {/*** 冒号分隔符*/public static final String SEMICOLON = ":";/*** redis 订阅通道名*/public static final String CHANNEL_NAME = "imRedisTopic";/*** 当前节点在线session*/public static final Map<String, WebSocketSession> POOL_SESSION = new ConcurrentHashMap<>();@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String key = getKey(session);POOL_SESSION.put(key, session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {String key = getKey(session);POOL_SESSION.remove(key);}private String getKey(WebSocketSession session) {String userType = (String) session.getAttributes().get(SecurityConstants.DETAILS_USERTYPE);String userId = (String) session.getAttributes().get(SecurityConstants.DETAILS_USER_ID);String key = userType + SEMICOLON + userId;if (userId == null) {throw new BusinessServiceException("未认证");}return key;}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 获得客户端传来的消息String payload = message.getPayload();//log.info("server 接收到消息 {}", payload);session.sendMessage(new TextMessage(payload));}/*** 发送消息*/public boolean sendMsg(ChatPageVO message) {String key = message.getAnotherUserType() + SEMICOLON + message.getAnotherUserId();if (POOL_SESSION.containsKey(key)) {try {POOL_SESSION.get(key).sendMessage(new TextMessage(JSON.toJSONString(message)));} catch (IOException e) {log.error("发送消息给{}失败", key, e);return false;}} else {// 发布消息redisTemplate.convertAndSend(CHANNEL_NAME, JSON.toJSONString(message));}return true;}}
2、RedisMessageListener 订阅发布监听类
import com.alibaba.fastjson.JSON;
import com.techhf.im.api.model.vo.ChatPageVO;
import com.techhf.im.biz.ws.WebSocketHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;/*** Redis订阅发布监听类** @author ronshi* @date 2023/6/7 16:00*/
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] bytes) {// 获取消息Object messageBody = redisTemplate.getValueSerializer().deserialize(message.getBody());if (messageBody == null) {return;}String content = messageBody.toString();ChatPageVO chatMessage = JSON.parseObject(content, ChatPageVO.class);String key = chatMessage.getAnotherUserType() + WebSocketHandler.SEMICOLON + chatMessage.getAnotherUserId();//当前节点在线sessionMap<String, WebSocketSession> onlineSessionMap = WebSocketHandler.POOL_SESSION;if (onlineSessionMap.containsKey(key)) {try {onlineSessionMap.get(key).sendMessage(new TextMessage(content));} catch (IOException e) {log.error("发送消息给{}失败", key, e);}}}
}
3、RedisSubscriberConfig 订阅发布模式的容器配置
import com.techhf.im.biz.ws.WebSocketHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** 订阅发布模式的容器配置** @author ronshi* @date 2023/6/7 17:01*/
@Configuration
@AutoConfigureAfter({RedisMessageListener.class})
public class RedisSubscriberConfig {@Autowiredprivate RedisMessageListener listener;/*** 创建消息监听容器** @param redisConnectionFactory redis连接工厂* @return 监听容器*/@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);container.addMessageListener(listener, new ChannelTopic(WebSocketHandler.CHANNEL_NAME));return container;}
}