直播弹幕系统(六)- SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理)
- 前言
- 一. SpringBoot整合RabbitMQ代理Broker
- 1.1 RabbitMQ安装STOMP插件(Docker)
- 1.2 RabbitMQ相关准备
- 1.3 其他代码
- 二. 前端整合RabbitMQ
- 2.1 最终效果
- 2.2 和Spring代理方式有何不同
前言
上一篇文章整合Stomp替换原生WebSocket方案探究 中,完成了STOMP
对原生WebSocket
写法的一个平移架构替换。这篇文章则在其基础上做一些优化和操作。
在设计原生WebSocket
架构的时候,我们用本地缓存来存储对应的WebSocket
。在进行消息交互的时候,还需要自己去拿到对应的Session
信息,然后发送。如果是群发消息,还得借助for
循环进行遍历发送。
但是我们在整合STOMP
的时候,这种群发、对于Session
的管理,框架都帮我们做好了。我们只需要关注业务上的操作即可。上篇文章中,我们主要使用的是SimpMessagingTemplate
来完成的。同时我们使用Spring
本身作为STOMP
的一个代理Broker
。
- 例如当我们打开了两个会话窗口:就会发现
SimpleBrokerMessageHandler
下的session
就有两份。(AbstractSubscribableChannel.handlers
)。也就是说关于Session
会话的存储,使用了本地缓存来存储。
- 关于
Spring
代理,比如路由信息的存储,也是使用了本地缓存来存储。
也许,我们改变Session
的存储机制比较困难,不容易上手。但是如果从Broker
代理层面来考虑,我们是否有办法将代理的事情丢给第三方呢?这样不就可以节省一定的内存空间了吗?
因此本文就来探讨下,如何使用RabbitMQ
来作为代理Broker
。
一. SpringBoot整合RabbitMQ代理Broker
代码都是在上篇文章代码基础上更改的哦。
1.1 RabbitMQ安装STOMP插件(Docker)
1.查看容器docker ps
2.进入rabbitmq
容器内部:
docker exec -it b5ec297fb969 /bin/bash
3.执行指令开启web stomp
插件:
rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples
4.提交容器为新镜像:
docker commit b5ec297fb969 rabbitmq:stomp
5.停止原容器:
docker stop rabbitmq
6.开启新容器:
docker run -d --name=rabbitmq2 -p 5617:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 -p 15670:15670 -p 15674:15674 rabbitmq:stomp
7.登录管理界面,检查是否开启成功:15674
端口。
1.2 RabbitMQ相关准备
1.首先还是准备我们的pom
依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.增加一下RabbitMQ
相关的信息配置,增加application.yml
文件中的配置:
spring:rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 10# 最大并发数max-concurrency: 20# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址
3.我们去RabbitMQManagement
控制台上增加一个案例用的交换机和队列。http://你的IP地址:15672/
添加一个名为stomp-exchange
的主题交换机(如果没有,一般上面安装完插件后,会自动生成)。
添加一个名为stomp-queue
的队列(如果没有,一般上面安装完插件后,会自动生成):
4.绑定交换机和队列:(添加好后,点击队列的名称,就会跳到详情页),路由为live.topic
。
按照同样的方式,我们再创建一个队列和交换机,用于测试用:
- 队列:
test-queue
- 交换机:
testTopic-exchange
- 记得添加绑定关系(这里就不加路由
Key
了)
对应的在LiveConstants
中添加五个常量:
public static final String STOMP_EXCHANGE = "stomp-exchange";
public static final String STOMP_QUEUE = "stomp-queue";
public static final String STOMP_ROUTE_KEY = "live.topic";public static final String TEST_QUEUE = "test-queue";
public static final String TEST_TOPIC_EXCHANGE = "testTopic-exchange";
这里先重点声明一下:
test
开头的队列和交换机,是用来前端接收RabbitMQ
队列消息的。stomp
开头的队列和交换机,是前端发送消息时候,消息发送的目标队列。
而且本文当中,暂时不区分直播间号,因此才使用一个test
交换机和队列。会在下一篇文章里面专门写一个动态地队列和交换机。这样就可以区分不同直播间内的消息队列了。
5.紧接着,可以在代码里添加RabbitMQ
的配置类RabbitMQConfig
,主要做两件事:
- 加载我们上面创建好的交换机、队列、绑定关系。
- 自定义我们的消息监听器。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zong.constants.LiveConstants;
import zong.service.ChatService;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
@Slf4j
public class RabbitMQConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Autowiredprivate ChatService chatService;/*** 初始化队列*/@Beanpublic Queue stompQueue() {return new Queue(LiveConstants.STOMP_QUEUE, true);}@Beanpublic Queue testQueue() {return new Queue(LiveConstants.TEST_QUEUE, true);}/*** 初始化交换机*/@BeanTopicExchange stompTopicExchange() {return new TopicExchange(LiveConstants.STOMP_EXCHANGE, true, false);}@BeanTopicExchange testTopicExchange() {return new TopicExchange(LiveConstants.TEST_TOPIC_EXCHANGE, true, false);}/*** 初始化队列和交换机的绑定信息*/@BeanBinding stompBinding() {return BindingBuilder.bind(stompQueue()).to(stompTopicExchange()).with(LiveConstants.STOMP_ROUTE_KEY);}@BeanBinding stompBinding2() {return BindingBuilder.bind(testQueue()).to(testTopicExchange()).with("");}@BeanSimpleMessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setQueues(stompQueue());container.setExposeListenerChannel(true);container.setMaxConcurrentConsumers(1);container.setConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {byte[] body = message.getBody();String msg = new String(body);log.info("rabbitmq收到消息 : {}", msg);boolean sendToWebsocket = chatService.sendMsg(msg);if (sendToWebsocket) {System.out.println("消息处理成功! 已经推送到websocket!");channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费}});return container;}
}
1.3 其他代码
在JSON
序列化工具类JsonUtil
中增加一个反序列化函数:
public static <T> T parseJsonToObj(String json, Class<T> c) {if (StringUtils.isBlank(json)) {return null;}try {return JSONObject.parseObject(json, c);} catch (Exception e) {System.out.println(e.getMessage());}return null;
}
1.业务类ChatService
,主要添加一个入口,将处理好的消息丢给MQ,然后客户端直接监听对应的test
交换机即可。
@Autowired
private RabbitTemplate rabbitTemplate;public boolean sendMsg(String message) {if (StringUtils.isBlank(message)) {return false;}ChatMessage chatMessage = JsonUtil.parseJsonToObj(message, ChatMessage.class);if (chatMessage == null) {return false;}LiveMessage liveMessage = new LiveMessage();liveMessage.setType(MessageType.CHAT.toString());liveMessage.setContent("用户 [" + chatMessage.getSender() + "] 说 (来自MQ):" + chatMessage.getContent());rabbitTemplate.convertAndSend(LiveConstants.TEST_TOPIC_EXCHANGE, "", JsonUtil.toJSON(liveMessage));return true;
}
2.视图层ChatController
,添加一个普通的Post
请求。咱们不搞@MessageMapping
那一套了。
/*** 客户端发送消息入口,RabbitMQ入口*/
@PostMapping(value = "/live/sendMessageToMQ")
public void sendMessage(@RequestBody ChatMessage chatMessage) {rabbitTemplate.convertAndSend(LiveConstants.STOMP_EXCHANGE, LiveConstants.STOMP_ROUTE_KEY, JsonUtil.toJSON(chatMessage));
}
3.哦对,这里我们还要加一个序列化配置FastJsonCfg
,否则这里参数接收到不到:
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;@Configuration
public class FastJsonCfg {@BeanFastJsonHttpMessageConverter fastJsonHttpMessageConverter(){FastJsonHttpMessageConverter converter = new FastJsonHttpMessageConverter();FastJsonConfig config = new FastJsonConfig();config.setCharset(StandardCharsets.UTF_8);config.setDateFormat("yyyy-MM-dd");converter.setFastJsonConfig(config);return converter;}
}
二. 前端整合RabbitMQ
Egg
项目架构:
1.修改前端页面index.tsx
文件,整合RabbitMQ
相关代码:
const ws = new WebSocket('ws://你的RabbitMQ地址:15674/ws');
const stompMQClient = Stomp.over(ws);const onMQConnected = () => {console.log('RabbitMQ初始化成功');// 订阅交换机,就是/exchange/交换机名称// 如果订阅队列,就是/queue/队列名称stompMQClient.subscribe('/exchange/testTopic-exchange', function(data:any) {const res = data.body;// 消息体const entity = JSON.parse(res);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));// 消息确认data.ack();}, { ack: 'client' });
};
// 从前往后的参数:用户名、密码、连接成功回调、连接错误回调、虚拟路径,默认/
stompMQClient.connect('guest', 'guest', onMQConnected, onError, '/');
完整代码:
import React, { useEffect, useState } from 'react';
import { Button, Row, Col, Input } from 'antd';
import { getValueByParam } from '../utils/pageHelper';
import axios from '../utils/axiosHelper';
const SockJS = window.SockJS;
const Stomp = window.Stomp;
const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.over(socket);
const ws = new WebSocket('ws://你的RabbitMQ地址:15674/ws');
const stompMQClient = Stomp.over(ws);const roomId = getValueByParam('roomId');
const userId = getValueByParam('userId');const UserPage = () => {const [ message, setMessage ] = useState<string>('');const [ bulletList, setBulletList ] = useState<any>([]);const [ onlineCount, setOnlineCount ] = useState<number>(0);useEffect(() => {const onMessageReceived = (msg:any) => {const entity = JSON.parse(msg.body);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));if (entity.type === 'JOIN' || entity.type === 'LEAVE') {setOnlineCount(entity.count ?? 0);}};const onConnected = () => {// 订阅群发主题stompClient.subscribe(`/live/topic_${roomId}`, onMessageReceived);const chatMessage = {sender: userId,type: 'JOIN',roomId,};stompClient.send('/live/sendMessage',{},JSON.stringify(chatMessage),);};const onError = (error:any) => {console.log(error);};const onMQConnected = () => {console.log('RabbitMQ初始化成功');// 订阅交换机stompMQClient.subscribe('/exchange/testTopic-exchange', function(data:any) {const res = data.body;const entity = JSON.parse(res);const arr :any = [ entity.content ];setBulletList((pre: any[]) => [].concat(...pre, ...arr));data.ack();}, { ack: 'client' });};stompClient.connect({ userId, roomId }, onConnected, onError);stompMQClient.connect('guest', 'guest', onMQConnected, onError, '/');}, []);const sendMsgToMQ = () => {const chatMessage = {sender: userId,content: message,type: 'CHAT',roomId,};axios('', '/live/sendMessageToMQ', chatMessage);};const sendMsg = () => {const chatMessage = {sender: userId,content: message,type: 'CHAT',roomId,};stompClient.send('/live/sendMessage',{},JSON.stringify(chatMessage),);};return <><Row style={{ width: 2000, marginTop: 200 }}><Col offset={6}><Input onChange={event => setMessage(event.target.value)} /></Col><Col><ButtononClick={sendMsg}type='primary'>发送弹幕</Button><ButtononClick={sendMsgToMQ}type='primary'>发送弹幕整合MQ</Button></Col><Col style={{ marginLeft: 100 }}>{'在线人数: ' + onlineCount}</Col><Col style={{ marginLeft: 10 }}><div style={{ border: '1px solid', width: 500, height: 500 }}>{bulletList.map((item: string, index: number) => {return <Row key={index}>{item}</Row>;})}</div></Col></Row></>;
};export default UserPage;
2.由于前端发起了一个axios
请求,因此需要在Egg后端router.ts
文件中增加一个路由:
router.post('/live/sendMessageToMQ', controller.home.sendMessageToMQ);
3.对应Controller
增加函数:
async sendMessageToMQ() {const entity = this.ctx.request.body;const preUrl = 'http://localhost:4396/api';const result = await this.ctx.curl(`${preUrl}/live/sendMessageToMQ`, {method: 'POST',contentType: 'application/json;',timeout: 30000,headers: {host: preUrl,},data: JSON.stringify(entity),dataType: 'json',
});
4.增加一个代理配置,在config
目录下增加一个文件夹envConfig
,并添加代理配置:
文件内容:如果以/api
为前缀,那么会将请求转发到http://localhost:8080
,并且/api
这个路径会重写为空字符串。
{"urlPrefix": "","proxy": {"/api": {"target": "http://localhost:8080","changeOrigin": true,"pathRewrite": {"^/api": ""},"secure": false}}
}
5.修改config.dafult.ts
文件,增加下面的配置:
代码:
config.httpProxy = {...envInfo.proxy,request: {enableProxy: true,},
};
2.1 最终效果
访问页面:
点击发送弹幕
按钮:发送666
点击发送弹幕整合MQ
按钮:发送 “MQMQ”:
2.2 和Spring代理方式有何不同
说下Spring
代理的大致流程:
-
前端和
Spring
代理Broker
之间建立Socket
链接。 -
前端可以向路径A发送一条信息。服务器可以通过
@MessageMapping("A")
的方式拿到这个消息。
-
服务器再通过
SimpMessagingTemplate
,通过Spring
代理向客户端返回数据。
再说下RabbitMQ
代理有何不同。
- 不再使用
SimpMessaging
那一套东东了。可以通过接口请求。直接将数据打到RabbitMQ
中。
- 通过对
RabbitMQ
监听器的设置,可以监听到客户端发送的消息,然后再将对应的消息发送到对应的直播间所订阅的队列/交换机中。
3.发送的方式:
4.前端直接监听MQ
:
像这样。如果想替换Spring
代理的写法,那就应该避免使用SimpMessagingTemplate
这类方式将消息广播出去。我们可以继续使用RabbitMQ
来广播消息。前端只需要监听RabbitMQ
即可。
当然,本篇文章还存在一个问题就是:不区分直播间号,所有直播间共享一个消息队列。因为案例代码使用的是写死的队列和交换机(test
)。因此在下一篇文章,会探讨如何让不同直播间动态地绑定交换机和队列。同时后端还能动态创建队列和交换机并注入到容器中。