【项目实战】redis实现websocket分布式消息推送服务

ops/2024/12/25 3:30:55/

由于redis并非专业的MQ中间件,消息的防丢失策略并不完整,存在丢失消息的可能。该方案为在再pc web管理平台的右下角弹出,显示新接收到的消息数,哪怕没有收到这个通知,也可以自己在消息中心看看。所以对可靠性要求不高。如果业务场景要求可靠性高,还是请使用专业的MQ中间件。该方案已在多个实际项目中运行。

流程架构

websocket实现同一账户多点登录、websocket服务多节点部署推送方案。

简单架构图

假设用户A在两个地方登录,连接到两个websocketServer服务节点1和2,用户B连接到2节点。

websocketServer将websocket session保存在各自的Map<String,Session>中,key为userid,value为websocket Session。节点1保存了用户A的websocket session,节点2保存了用户A、B的websocket session。

消息生产者发布消息的时候为json格式,如:[{"receive"="userid_a","msg"="您有1个未读消息"},{"receive"="userid_b","msg"="您有3个未读消息"}],将消息发到redis的一个Channel,如showNewestMsg。

websocketServer中订阅redis的channel=showNewestMsg,收到消息后根据消息中receive冲map中找到对应的websocket session,发消息给客户端。

核心代码

1.该项目为springboot项目,先引入jar包,由于是从实际项目中抽出来写的记录,可能还缺jar请自行导入。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-web</artifactId>

</dependency>

<!--websocket-->

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-websocket</artifactId>

</dependency>

<!-- redis -->

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

<!-- 工具类 -->

<dependency>

    <groupId>cn.hutool</groupId>

    <artifactId>hutool-all</artifactId>

    <version>5.3.6</version>

</dependency>

<dependency>

    <groupId>net.sf.json-lib</groupId>

    <artifactId>json-lib</artifactId>

    <version>2.4</version>

    <classifier>jdk15</classifier>

</dependency>

2.websocket配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**

 * spring websocket组件初始化

 * @author csf

 *

 */

//war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务

@Configuration

public class WebSocketConfig

{

    @Bean

    public ServerEndpointExporter serverEndpointExporter()

    {

        return new ServerEndpointExporter();

    }

}

注意:war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务。

3.websocket服务端实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import javax.annotation.Resource;

import javax.websocket.OnClose;

import javax.websocket.OnError;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import com.kingengine.plug.service.MessageService;

import cn.hutool.core.util.StrUtil;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

/**

 * WebSocket服务类

 * @author csf

 * @date 2020年8月10日

 */

@ServerEndpoint("/websocket/{custId}")

@Component

public class WebSocketServer

{

    @Resource

    private MessageService messageService;

     

    Logger log = LoggerFactory.getLogger(this.getClass());

     

    // 当前在线连接数

    private static int onlineCount = 0;

     

    // 存放每个用户对应的WebSocket连接对象,key为custId_HHmmss,确保一个登录用户只建立一个连接

    private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>();

     

    // 与某个客户端的连接会话,需要通过它来给客户端发送数据

    private Session session;

     

    // 接收用户id

    private String custId = "";

     

    private static WebSocketServer webSocketServer;

     

    // 通过@PostConstruct实现初始化bean之前进行的操作

    @PostConstruct

    public void init()

    {

        // 初使化时将已静态化的webSocketServer实例化

        webSocketServer = this;

        webSocketServer.messageService = this.messageService;

    }

     

    /**

     * 连接建立成功调用的方法

     * @param session 连接会话,由框架创建

     * @param custId 用户id, 为处理用户多点登录都能收到消息,需传该格式custId_HHmmss

     * @author csf

     * @date 2020年8月10日

     */

    @OnOpen

    public void onOpen(Session session, @PathParam("custId") String custId)

    {

        if (!webSocketSessionMap.containsKey(custId))

        {

            this.session = session;

            webSocketSessionMap.put(custId, session);

            addOnlineCount(); // 在线数加1

            log.info("有新连接[{}]接入,当前websocket连接数为:{}", custId, getOnlineCount());

        }

         

        this.custId = custId;

        try

        {

            // 第一次建立连接,推送消息给客户端,只会执行一次。后续的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis订阅消息推送

            // 获取未读消息数

            // 由于前端传进来的custId是有时间后缀的,查询时需要去掉后缀。

            String qryCustId = custId.split("_")[0];

            JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId);

             

            // 获取最新消息

            /*  JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId);

            // 发送消息

            JSONArray msgArr = new JSONArray();

            if (newMsg!=null)

            {

                msgArr.add(newMsg);

            }*/

            JSONArray msgArr = new JSONArray();

            msgArr.add(unreadMsg);

            sendMessage(custId, msgArr.toString());

        }

        catch (Exception e)

        {

            log.error("客户端连接websocket服务异常");

            e.printStackTrace();

        }

    }

     

    /**

     * 连接关闭调用的方法

     */

    @OnClose

    public void onClose(@PathParam("custId") String sessionKey)

    {

        if (webSocketSessionMap.containsKey(sessionKey))

        {

            try

            {

                webSocketSessionMap.get(sessionKey).close();

                webSocketSessionMap.remove(sessionKey);

            }

            catch (IOException e)

            {

                log.error("连接[{}]关闭失败。", sessionKey);

                e.printStackTrace();

            }

            subOnlineCount();

            log.info("连接[{}]关闭,当前websocket连接数:{}", sessionKey, onlineCount);

        }

    }

     

    /**

     * 接收客户端发送的消息

     * @param message 客户端发送过来的消息

     * @param session websocket会话

     */

    @OnMessage

    public void onMessage(String message, Session session)

    {

        log.info("收到来自客户端" + custId + "的信息:" + message);

    }

     

    /**

     * 连接错误时触发

     * @param session

     * @param error

     */

    @OnError

    public void onError(Session session, Throwable error)

    {

        try

        {

            session.close();

        }

        catch (IOException e)

        {

            log.error("发生错误,连接[{}]关闭失败。");

            e.printStackTrace();

        }

        // log.error("websocket发生错误");

        // error.printStackTrace();

    }

     

    /**

     * 给指定的客户端推送消息,可单发和群发

     * @param sessionKeys 发送消息给目标客户端sessionKey,多个逗号“,”隔开1234,2345...

     * @param message

     * @throws IOException

     * @author csf

     * @date 2020年8月11日

     */

    public void sendMessage(String sessionKeys, String message)

    {

        if (StrUtil.isNotBlank(sessionKeys))

        {

            String[] sessionKeyArr = sessionKeys.split(",");

            for (String key : sessionKeyArr)

            {

                try

                {

                    // 可能存在一个账号多点登录

                    List<Session> sessionList = getLikeByMap(webSocketSessionMap, key);

                    for (Session session : sessionList)

                    {

                        session.getBasicRemote().sendText(message);

                    }

                }

                catch (IOException e)

                {

                    e.printStackTrace();

                    continue;// 某个客户端发送异常,不影响其他客户端发送

                }

            }

        }

        else

        {

            log.info("sessionKeys为空,没有目标客户端");

        }

    }

     

    /**

     * 给当前客户端推送消息,首次建立连接时调用

     */

    public void sendMessage(String message)

        throws IOException

    {

        this.session.getBasicRemote().sendText(message);

    }

     

    /**

     * 检查webSocket连接是否在线

     * @param sesstionKey webSocketMap中维护的key

     * @return 是否在线

     */

    public static boolean checkOnline(String sesstionKey)

    {

        if (webSocketSessionMap.containsKey(sesstionKey))

        {

            return true;

        }

        else

        {

            return false;

        }

    }

     

    /**

     * 获取包含key的所有map值

     * @param map

     * @param keyLike

     * @return

     * @author csf

     * @date 2020年8月13日

     */

    private List<Session> getLikeByMap(Map<String, Session> map, String keyLike)

    {

        List<Session> list = new ArrayList<>();

        for (String key : map.keySet())

        {

            if (key.contains(keyLike))

            {

                list.add(map.get(key));

            }

        }

        return list;

    }

     

    public static synchronized int getOnlineCount()

    {

        return onlineCount;

    }

     

    public static synchronized void addOnlineCount()

    {

        WebSocketServer.onlineCount++;

    }

     

    public static synchronized void subOnlineCount()

    {

        WebSocketServer.onlineCount--;

    }

}

4.redis消息订阅配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

import org.springframework.cache.annotation.EnableCaching;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration

@EnableCaching

public class RedisCacheConfig

{

    @Bean

    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)

    {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(connectionFactory);

        // 可以添加多个 messageListener,配置不同的交换机

        container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 订阅最新消息频道

        return container;

    }

     

    @Bean

    MessageListenerAdapter listenerAdapter(RedisReceiver receiver)

    {

        // 消息监听适配器

        return new MessageListenerAdapter(receiver, "onMessage");

    }

     

    @Bean

    StringRedisTemplate template(RedisConnectionFactory connectionFactory)

    {

        return new StringRedisTemplate(connectionFactory);

    }

}

5.redis配置,直接放在springboot项目application.properties或application.yml中

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

# 数据库索引(默认为0)

spring.redis.database=0 

spring.redis.host=192.168.1.100

spring.redis.port=6379

spring.redis.password=123456

# 连接池最大连接数(使用负值表示没有限制)

spring.redis.pool.max-active=8 

# 连接池最大阻塞等待时间(使用负值表示没有限制)

spring.redis.pool.max-wait=-1 

# 连接池中的最大空闲连接

spring.redis.pool.max-idle=8 

# 连接池中的最小空闲连接

spring.redis.pool.min-idle=0 

# 连接超时时间(毫秒)

spring.redis.timeout=5000

6.接收消息生产者发布的消息,推送给对应的客户端

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

import java.io.UnsupportedEncodingException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.connection.Message;

import org.springframework.data.redis.connection.MessageListener;

import org.springframework.stereotype.Component;

import com.kingengine.plug.websocket.WebSocketServer;

import cn.hutool.core.codec.Base64;

import cn.hutool.core.util.StrUtil;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

/**

 * 消息监听对象,接收订阅消息

 * @author csf

 * @date 2020年8月13日

 */

@Component

public class RedisReceiver implements MessageListener

{

    Logger log = LoggerFactory.getLogger(this.getClass());

     

    @Autowired

    WebSocketServer webSocketServer;

     

    /**

     * 处理接收到的订阅消息

     */

    @Override

    public void onMessage(Message message, byte[] pattern)

    {

        String channel = new String(message.getChannel());// 订阅的频道名称

        String msg = "";

        try

        {

            msg = new String(message.getBody(), "GBK");//注意与发布消息编码一致,否则会乱码

            if (StrUtil.isNotBlank(msg)){

                if ("showNewestMsg".endsWith(channel))// 最新消息

                {

                    JSONObject json = JSONObject.fromObject(msg);

                    webSocketServer.sendMessage(json.get("receive"),json.get("msg"));

                }else{

                    //TODO 其他订阅的消息处理

                }

                

            }else{

                log.info("消息内容为空,不处理。");

            }

        }

        catch (Exception e)

        {

            log.error("处理消息异常:"+e.toString())

            e.printStackTrace();

        }

    }

}

7.消息发布测试

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import net.sf.json.JSONObject;

@RequestMapping("redis")

@RestController

public class RedisTestController

{

    @Autowired

    StringRedisTemplate template;

     

    /**

     * 发布消息测试

     *@param userid

     * @param msg

     * @return

     */

    @PostMapping("sendMessage")

    public String sendMessage(String userid,String msg)

    {

        try

        {

            String newMessge=new String(msg.getBytes("GBK"),"GBK");

            Map<String,String> map = new HashMap<String, String>();

            map.put("receive", userid);

            map.put("msg", newMessge);

            template.convertAndSend("showNewestMsg",        

          JSONObject.fromObject(map).toString());

        }

        catch (UnsupportedEncodingException e)

        {

            e.printStackTrace();

        }

        return "消息发布成功!";

    }

}

8.客户端代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

<!DOCTYPE html>

<html>

<head>

    <title>WebSocket测试</title>

    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>

</head>

<body>

<div>

    来自服务端消息:

    <p id="message"></p>

</div>

</body>

<script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script>

<script>

    let webSocketClient;

    if (window.WebSocket)

    {

       let custid="132456_" + Math.random();//该参数会作为websocketServer中存储session的key,要保证唯一。

        webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid);

        //连通之后的回调事件

        webSocketClient.onopen = function () {

            webSocketClient.send("这里是地球,收到请回答。。。");

            //  webSocket.send('{"type":"1","data":"121"}');

        };

        //接收后台服务端的消息

        webSocketClient.onmessage = function (evt) {

            console.log("数据已接收:" + evt.data);

            showMessage("未读消息:" + evt.data);

        };

        //连接关闭的回调事件

        webSocketClient.onclose = function () {

            alert("连接已关闭...");

        };

    }else{

        alert("浏览器不支持websocket");

    }

    function showMessage(message) {

        $("#message").html(message);

    }

</script>

</html>


http://www.ppmy.cn/ops/144738.html

相关文章

大恒相机开发(1)—Python调用采集彩色图像并另存为本地

这段代码是一个Python程序&#xff0c;用于从大恒相机采集彩色图像&#xff0c;并将其保存到本地。 前面需要自己修改下频率和采集的次数 framerate_set&#xff1a;设置相机的帧率。num&#xff1a;设置采集图像的次数。 咱们直接上python的完整代码&#xff1a; import cv2 …

好家伙!仅需1行Python,腾讯云智能OCR让手写发票识别效率飙升!

​ 大家好&#xff0c;这里是程序员晚枫&#xff0c;今天给大家带来一个腾讯云的新功能&#xff1a;智能结构化识别&#xff08;Pro版&#xff09;。 智能结构化&#xff08;Smart Structure Optical Character Recognition &#xff09;融合了业界领先的深度学习技术、图像检…

重温设计模式--组合模式

文章目录 1 、组合模式&#xff08;Composite Pattern&#xff09;概述2. 组合模式的结构3. C 代码示例4. C示例代码25 .应用场景 1 、组合模式&#xff08;Composite Pattern&#xff09;概述 定义&#xff1a;组合模式是一种结构型设计模式&#xff0c;它允许你将对象组合成…

mysql 查询优化之字段建立全文索引

最近在接手一些老项目时发现表设计存在问题导致查询较慢 例如一张旧表的设计&#xff1a; 模糊匹配某个关键字时,需要十几秒左右,而且表的数据量不多 都知道mysql8.0版本InnoDB引擎都支持全文索引了,因此可以在content建立全文索引&#xff0c;但全文索引对中文支持并不完善…

设计模式--工厂方法模式【创建型模式】

设计模式的分类 我们都知道有 23 种设计模式&#xff0c;这 23 种设计模式可分为如下三类&#xff1a; 创建型模式&#xff08;5 种&#xff09;&#xff1a;单例模式、工厂方法模式、抽象工厂模式、建造者模式、原型模式。结构型模式&#xff08;7 种&#xff09;&#xff1…

Windows下ESP32-IDF开发环境搭建

Windows下ESP32-IDF开发环境搭建 文章目录 Windows下ESP32-IDF开发环境搭建一、软件安装二、搭建IDF开发环境2.1 安装VS Code插件&#xff1a;2.2 配置ESP-IDF插件&#xff1a;2.3 下载例程源码&#xff1a; 三、编译和烧录代码四、Windows下使用命令行编译和烧录程序4.1 配置环…

Docker安装

目录 1. 联网安装 Docker 2. 离线安装 Docker 3. 安装 Docker Compose 4. 卸载 Docker 和 Docker Compose 1. 联网安装 Docker 在 CentOS 上通过 yum 安装 Docker&#xff1a; # 安装 Docker yum -y install docker # 启动 Docker systemctl start docker # 查看 D…

2024-12-24 NO1. XR Interaction ToolKit 环境配置

文章目录 1 软件配置2 安装 XRToolKit3 配置 OpenXR4 安装示例场景5 运行测试 1 软件配置 Unity 版本&#xff1a;Unity6000.0.26 ​ 2 安装 XRToolKit 创建新项目&#xff08;URP 3D&#xff09;&#xff0c;点击进入 Asset Store。 进入“Unity Registry”页签&#xff0…