直播弹幕系统(六)- SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理)

news/2025/1/11 11:37:47/

直播弹幕系统(六)- SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理)

  • 前言
  • 一. SpringBoot整合RabbitMQ代理Broker
    • 1.1 RabbitMQ安装STOMP插件(Docker)
    • 1.2 RabbitMQ相关准备
    • 1.3 其他代码
  • 二. 前端整合RabbitMQ
    • 2.1 最终效果
    • 2.2 和Spring代理方式有何不同

前言

上一篇文章整合Stomp替换原生WebSocket方案探究 中,完成了STOMP对原生WebSocket写法的一个平移架构替换。这篇文章则在其基础上做一些优化和操作。

在设计原生WebSocket架构的时候,我们用本地缓存来存储对应的WebSocket。在进行消息交互的时候,还需要自己去拿到对应的Session信息,然后发送。如果是群发消息,还得借助for循环进行遍历发送。

但是我们在整合STOMP的时候,这种群发、对于Session的管理,框架都帮我们做好了。我们只需要关注业务上的操作即可。上篇文章中,我们主要使用的是SimpMessagingTemplate来完成的。同时我们使用Spring本身作为STOMP的一个代理Broker

  • 例如当我们打开了两个会话窗口:就会发现SimpleBrokerMessageHandler下的session就有两份。(AbstractSubscribableChannel.handlers)。也就是说关于Session会话的存储,使用了本地缓存来存储。
    在这里插入图片描述
  • 关于Spring代理,比如路由信息的存储,也是使用了本地缓存来存储。
    在这里插入图片描述

也许,我们改变Session的存储机制比较困难,不容易上手。但是如果从Broker代理层面来考虑,我们是否有办法将代理的事情丢给第三方呢?这样不就可以节省一定的内存空间了吗?

因此本文就来探讨下,如何使用RabbitMQ来作为代理Broker

一. SpringBoot整合RabbitMQ代理Broker

代码都是在上篇文章代码基础上更改的哦。

1.1 RabbitMQ安装STOMP插件(Docker)

1.查看容器docker ps在这里插入图片描述

2.进入rabbitmq容器内部:

docker exec -it b5ec297fb969 /bin/bash

3.执行指令开启web stomp插件:

rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples

在这里插入图片描述
4.提交容器为新镜像:

docker commit b5ec297fb969 rabbitmq:stomp

在这里插入图片描述

5.停止原容器:

docker stop rabbitmq

6.开启新容器:

docker run -d --name=rabbitmq2  -p 5617:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 -p 15670:15670 -p 15674:15674 rabbitmq:stomp

7.登录管理界面,检查是否开启成功:15674端口。
在这里插入图片描述

1.2 RabbitMQ相关准备

1.首先还是准备我们的pom依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.增加一下RabbitMQ相关的信息配置,增加application.yml文件中的配置:

spring:rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 10# 最大并发数max-concurrency: 20# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址

3.我们去RabbitMQManagement控制台上增加一个案例用的交换机和队列。http://你的IP地址:15672/
在这里插入图片描述

添加一个名为stomp-exchange的主题交换机(如果没有,一般上面安装完插件后,会自动生成)。
在这里插入图片描述
添加一个名为stomp-queue的队列(如果没有,一般上面安装完插件后,会自动生成):

在这里插入图片描述
4.绑定交换机和队列:(添加好后,点击队列的名称,就会跳到详情页),路由为live.topic
在这里插入图片描述
按照同样的方式,我们再创建一个队列和交换机,用于测试用:

  • 队列:test-queue
  • 交换机:testTopic-exchange
  • 记得添加绑定关系(这里就不加路由Key了)

对应的在LiveConstants中添加五个常量:

public static final String STOMP_EXCHANGE = "stomp-exchange";
public static final String STOMP_QUEUE = "stomp-queue";
public static final String STOMP_ROUTE_KEY = "live.topic";public static final String TEST_QUEUE = "test-queue";
public static final String TEST_TOPIC_EXCHANGE = "testTopic-exchange";

这里先重点声明一下:

  • test开头的队列和交换机,是用来前端接收RabbitMQ队列消息的。
  • stomp开头的队列和交换机,是前端发送消息时候,消息发送的目标队列。

而且本文当中,暂时不区分直播间号,因此才使用一个test交换机和队列。会在下一篇文章里面专门写一个动态地队列和交换机。这样就可以区分不同直播间内的消息队列了。

5.紧接着,可以在代码里添加RabbitMQ的配置类RabbitMQConfig,主要做两件事:

  • 加载我们上面创建好的交换机、队列、绑定关系。
  • 自定义我们的消息监听器。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zong.constants.LiveConstants;
import zong.service.ChatService;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
@Slf4j
public class RabbitMQConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Autowiredprivate ChatService chatService;/*** 初始化队列*/@Beanpublic Queue stompQueue() {return new Queue(LiveConstants.STOMP_QUEUE, true);}@Beanpublic Queue testQueue() {return new Queue(LiveConstants.TEST_QUEUE, true);}/*** 初始化交换机*/@BeanTopicExchange stompTopicExchange() {return new TopicExchange(LiveConstants.STOMP_EXCHANGE, true, false);}@BeanTopicExchange testTopicExchange() {return new TopicExchange(LiveConstants.TEST_TOPIC_EXCHANGE, true, false);}/*** 初始化队列和交换机的绑定信息*/@BeanBinding stompBinding() {return BindingBuilder.bind(stompQueue()).to(stompTopicExchange()).with(LiveConstants.STOMP_ROUTE_KEY);}@BeanBinding stompBinding2() {return BindingBuilder.bind(testQueue()).to(testTopicExchange()).with("");}@BeanSimpleMessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setQueues(stompQueue());container.setExposeListenerChannel(true);container.setMaxConcurrentConsumers(1);container.setConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {byte[] body = message.getBody();String msg = new String(body);log.info("rabbitmq收到消息 : {}", msg);boolean sendToWebsocket = chatService.sendMsg(msg);if (sendToWebsocket) {System.out.println("消息处理成功! 已经推送到websocket!");channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费}});return container;}
}

1.3 其他代码

JSON序列化工具类JsonUtil中增加一个反序列化函数:

public static <T> T parseJsonToObj(String json, Class<T> c) {if (StringUtils.isBlank(json)) {return null;}try {return JSONObject.parseObject(json, c);} catch (Exception e) {System.out.println(e.getMessage());}return null;
}

1.业务类ChatService,主要添加一个入口,将处理好的消息丢给MQ,然后客户端直接监听对应的test交换机即可。

@Autowired
private RabbitTemplate rabbitTemplate;public boolean sendMsg(String message) {if (StringUtils.isBlank(message)) {return false;}ChatMessage chatMessage = JsonUtil.parseJsonToObj(message, ChatMessage.class);if (chatMessage == null) {return false;}LiveMessage liveMessage = new LiveMessage();liveMessage.setType(MessageType.CHAT.toString());liveMessage.setContent("用户 [" + chatMessage.getSender() + "] 说 (来自MQ):" + chatMessage.getContent());rabbitTemplate.convertAndSend(LiveConstants.TEST_TOPIC_EXCHANGE, "", JsonUtil.toJSON(liveMessage));return true;
}

2.视图层ChatController,添加一个普通的Post请求。咱们不搞@MessageMapping那一套了。

/*** 客户端发送消息入口,RabbitMQ入口*/
@PostMapping(value = "/live/sendMessageToMQ")
public void sendMessage(@RequestBody ChatMessage chatMessage) {rabbitTemplate.convertAndSend(LiveConstants.STOMP_EXCHANGE, LiveConstants.STOMP_ROUTE_KEY, JsonUtil.toJSON(chatMessage));
}

3.哦对,这里我们还要加一个序列化配置FastJsonCfg,否则这里参数接收到不到:

import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;@Configuration
public class FastJsonCfg {@BeanFastJsonHttpMessageConverter fastJsonHttpMessageConverter(){FastJsonHttpMessageConverter converter = new FastJsonHttpMessageConverter();FastJsonConfig config = new FastJsonConfig();config.setCharset(StandardCharsets.UTF_8);config.setDateFormat("yyyy-MM-dd");converter.setFastJsonConfig(config);return converter;}
}

二. 前端整合RabbitMQ

Egg项目架构:
在这里插入图片描述

1.修改前端页面index.tsx文件,整合RabbitMQ相关代码:

const ws = new WebSocket('ws://你的RabbitMQ地址:15674/ws');
const stompMQClient = Stomp.over(ws);const onMQConnected = () => {console.log('RabbitMQ初始化成功');// 订阅交换机,就是/exchange/交换机名称// 如果订阅队列,就是/queue/队列名称stompMQClient.subscribe('/exchange/testTopic-exchange', function(data:any) {const res = data.body;// 消息体const entity = JSON.parse(res);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));// 消息确认data.ack();}, { ack: 'client' });
};
// 从前往后的参数:用户名、密码、连接成功回调、连接错误回调、虚拟路径,默认/
stompMQClient.connect('guest', 'guest', onMQConnected, onError, '/');

完整代码:

import React, { useEffect, useState } from 'react';
import { Button, Row, Col, Input } from 'antd';
import { getValueByParam } from '../utils/pageHelper';
import axios from '../utils/axiosHelper';
const SockJS = window.SockJS;
const Stomp = window.Stomp;
const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.over(socket);
const ws = new WebSocket('ws://你的RabbitMQ地址:15674/ws');
const stompMQClient = Stomp.over(ws);const roomId = getValueByParam('roomId');
const userId = getValueByParam('userId');const UserPage = () => {const [ message, setMessage ] = useState<string>('');const [ bulletList, setBulletList ] = useState<any>([]);const [ onlineCount, setOnlineCount ] = useState<number>(0);useEffect(() => {const onMessageReceived = (msg:any) => {const entity = JSON.parse(msg.body);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));if (entity.type === 'JOIN' || entity.type === 'LEAVE') {setOnlineCount(entity.count ?? 0);}};const onConnected = () => {// 订阅群发主题stompClient.subscribe(`/live/topic_${roomId}`, onMessageReceived);const chatMessage = {sender: userId,type: 'JOIN',roomId,};stompClient.send('/live/sendMessage',{},JSON.stringify(chatMessage),);};const onError = (error:any) => {console.log(error);};const onMQConnected = () => {console.log('RabbitMQ初始化成功');// 订阅交换机stompMQClient.subscribe('/exchange/testTopic-exchange', function(data:any) {const res = data.body;const entity = JSON.parse(res);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));data.ack();}, { ack: 'client' });};stompClient.connect({ userId, roomId }, onConnected, onError);stompMQClient.connect('guest', 'guest', onMQConnected, onError, '/');}, []);const sendMsgToMQ = () => {const chatMessage = {sender: userId,content: message,type: 'CHAT',roomId,};axios('', '/live/sendMessageToMQ', chatMessage);};const sendMsg = () => {const chatMessage = {sender: userId,content: message,type: 'CHAT',roomId,};stompClient.send('/live/sendMessage',{},JSON.stringify(chatMessage),);};return <><Row style={{ width: 2000, marginTop: 200 }}><Col offset={6}><Input onChange={event => setMessage(event.target.value)} /></Col><Col><ButtononClick={sendMsg}type='primary'>发送弹幕</Button><ButtononClick={sendMsgToMQ}type='primary'>发送弹幕整合MQ</Button></Col><Col style={{ marginLeft: 100 }}>{'在线人数: ' + onlineCount}</Col><Col style={{ marginLeft: 10 }}><div style={{ border: '1px solid', width: 500, height: 500 }}>{bulletList.map((item: string, index: number) => {return <Row key={index}>{item}</Row>;})}</div></Col></Row></>;
};export default UserPage;

2.由于前端发起了一个axios请求,因此需要在Egg后端router.ts文件中增加一个路由:

router.post('/live/sendMessageToMQ', controller.home.sendMessageToMQ);

3.对应Controller增加函数:

async sendMessageToMQ() {const entity = this.ctx.request.body;const preUrl = 'http://localhost:4396/api';const result = await this.ctx.curl(`${preUrl}/live/sendMessageToMQ`, {method: 'POST',contentType: 'application/json;',timeout: 30000,headers: {host: preUrl,},data: JSON.stringify(entity),dataType: 'json',
});

4.增加一个代理配置,在config目录下增加一个文件夹envConfig,并添加代理配置:
在这里插入图片描述
文件内容:如果以/api为前缀,那么会将请求转发到http://localhost:8080,并且/api这个路径会重写为空字符串。

{"urlPrefix": "","proxy": {"/api": {"target": "http://localhost:8080","changeOrigin": true,"pathRewrite": {"^/api": ""},"secure": false}}
}

5.修改config.dafult.ts文件,增加下面的配置:
在这里插入图片描述

代码:

config.httpProxy = {...envInfo.proxy,request: {enableProxy: true,},
};

2.1 最终效果

访问页面:
在这里插入图片描述
点击发送弹幕按钮:发送666
在这里插入图片描述

点击发送弹幕整合MQ按钮:发送 “MQMQ”:
在这里插入图片描述

2.2 和Spring代理方式有何不同

说下Spring代理的大致流程:

  1. 前端和Spring代理Broker之间建立Socket链接。

  2. 前端可以向路径A发送一条信息。服务器可以通过 @MessageMapping("A")的方式拿到这个消息。
    在这里插入图片描述

  3. 服务器再通过SimpMessagingTemplate,通过Spring代理向客户端返回数据。
    在这里插入图片描述

再说下RabbitMQ代理有何不同。

  1. 不再使用SimpMessaging那一套东东了。可以通过接口请求。直接将数据打到RabbitMQ中。
    在这里插入图片描述
  2. 通过对RabbitMQ监听器的设置,可以监听到客户端发送的消息,然后再将对应的消息发送到对应的直播间所订阅的队列/交换机中。
    在这里插入图片描述
    3.发送的方式:
    在这里插入图片描述
    4.前端直接监听MQ
    在这里插入图片描述

像这样。如果想替换Spring代理的写法,那就应该避免使用SimpMessagingTemplate这类方式将消息广播出去。我们可以继续使用RabbitMQ来广播消息。前端只需要监听RabbitMQ即可。

当然,本篇文章还存在一个问题就是:不区分直播间号,所有直播间共享一个消息队列。因为案例代码使用的是写死的队列和交换机(test)。因此在下一篇文章,会探讨如何让不同直播间动态地绑定交换机和队列。同时后端还能动态创建队列和交换机并注入到容器中。


http://www.ppmy.cn/news/6225.html

相关文章

STM32三条总线(AHB、APB1、APB2)的外设映射情况

STM32三条总线&#xff08;AHB、APB1、APB2&#xff09;的外设映射情况 1、AHB (1)Flash储存器 (2)DMA (3)复位和时钟控制 (4)CRC (5)以太网 (6)SDIO 2、APB1总线(支持低速状态下的工作) (1)定时器TIM2到TIM7 (2)RTC (3)WDT看门狗 (4)SPI2、SPI3 (5)USART2、USART3 (6)UART4、U…

JavaSE基础篇:枚举

文章部分内容整理自知乎Peter McLeish的回答第一章&#xff1a;枚举类一&#xff1a;自定义一个枚举类二&#xff1a;JDK提供的枚举类型三&#xff1a;枚举中静态代码块的执行顺序第一章&#xff1a;枚举类 枚举类&#xff1a;类的对象只有有限个&#xff0c;且是确定的 Java…

DaVinci:曲线之 HSL 曲线

调色页面&#xff1a;曲线Color&#xff1a;CurvesH 指的是色相 Hue&#xff0c;S 指的是饱和度 Saturation&#xff0c;L 指的是亮度 Luminance。DaVinci Resolve 的曲线调板中&#xff0c;除了自定义曲线&#xff0c;还提供了六种基于色相、饱和度或亮度的调节曲线&#xff0…

图(Graph)的定义

图(Graph)的定义 文章目录图(Graph)的定义●图的形式化定义:G (V,E)●无向图和有向图的表示形式:● 有向图和无向图的定义●抽象数据类型定义ADT●图形结构属于复杂的非线性结构● 图由顶点的集合和边的集合构成 ●图的形式化定义:G (V,E) • 集合V(vertex):顶点的有限集合,…

【网络安全】浅识 SQL 注入

前言 SQL 注入&#xff08;SQL Injection&#xff09;是发生在 Web 程序中数据库层的安全漏洞&#xff0c;是网站存在最多也是最简单的漏洞。主要原因是程序对用户输入数据的合法性没有判断和处理&#xff0c;导致攻击者可以在 Web 应用程序中事先定义好的 SQL 语句中添加额外…

我写这10+个JavaScript单行代码,被组长夸代码写得优雅!

大厂面试题分享 面试题库 前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 JavaScript 非常大的特点容易上手且非常灵活&#xff0c;代码实现方式五花八门&#xff1b;有时候能一行代码解决&#xff0c;就尽量不用…

C# Winform 三层架构

一、介绍 三层架构是 C# 桌面开发中比较常用的框架&#xff0c;是由 表示层&#xff08;UI&#xff09;、业务逻辑层&#xff08;BLL&#xff09;和数据访问层&#xff08;DAL&#xff09;三层架构组成&#xff0c;目的是为了 “高内聚&#xff0c;低耦合”。开发人员分工更明…

【并发】深入理解JMM并发三大特性(一)

【并发】深入理解JMM&并发三大特性&#xff08;一&#xff09; 今天是2022.11.16&#xff0c;在此之前我已经学习完了图灵课堂MySQL的课程&#xff0c;也是想这篇文章一样用CSDN博客的形式来记录这些知识点。 在并发中&#xff0c;JMM在大多数人眼中&#xff0c;它是整个…