直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
- 前言
- 一. Socket服务整合RabbitMQ
- 二. 弹幕服务创建
- 2.1 创建一个公共maven项目
- 2.2 弹幕服务项目创建
- 2.2.1 创建队列和广播型交换机
- 2.2.2 生产者发送最终弹幕数据
- 2.2.3 消费者监听原始弹幕数据
- 2.3 Socket服务监听弹幕数据并返回前端
- 2.3.1 配置类
- 2.3.2 消费者
- 2.4 测试
前言
上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。
一. Socket服务整合RabbitMQ
我们页面上,通过WebSocket
发送弹幕信息的时候,后端通过@OnMessage
注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ
。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket
服务它只负责消息的传递和WebSocket
信息的维护,业务逻辑啥也不做。
1.添加pom
依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件bootstrap.yml
,添加RabbitMQ相关配置
server:port: 81spring:application:name: tv-service-socketcloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672
3.RabbitMQ
配置类:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue("originBullet-queue", true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange("bulletPreProcessor-exchange", true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");}
}
4.写一个简单的消息体OriginMessage
,发送到MQ
的:
import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午1:30*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}
5.MQ
生产者OriginMessageSender
:
/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Component
public class OriginMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));// 发送给消息预处理队列rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称"bullet.originMessage",// 路由Keymap, correlationData);}
}
6.我们再对WebSocket
的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API
即可。只需要注意一下多例下属性的注入方式是怎么写的即可。
import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {/*** 多例模式下的赋值方式*/private static OriginMessageSender originMessageSender;/*** 多例模式下的赋值方式*/@Autowiredprivate void setOriginMessageSender(OriginMessageSender originMessageSender) {BulletScreenServer.originMessageSender = originMessageSender;}private static final AtomicLong count = new AtomicLong(0);private Session session;private String sessionId;private String userId;private String roomId;/*** 打开连接* @param session* @OnOpen 连接成功后会自动调用该方法*/@OnOpenpublic void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用trycount.incrementAndGet();log.info("*************WebSocket连接次数: {} *************", count.longValue());this.userId = userId;this.roomId = roomId;// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;SocketCache.put(sessionId, this);}/*** 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接*/@OnClosepublic void closeConnection() {SocketCache.remove(sessionId);}/*** 客户端发送消息给服务端* @param message*/@OnMessagepublic void onMessage(String message) {if (StringUtils.isBlank(message)) {return;}// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的originMessageSender.send(buildMessage(message));}private OriginMessage buildMessage(String message) {OriginMessage originMessage = new OriginMessage();originMessage.setMessage(message);originMessage.setRoomId(roomId);originMessage.setSessionId(sessionId);originMessage.setUserId(userId);return originMessage;}
}
备注:记得将另一个Socket
项目也改造成同样的代码。
二. 弹幕服务创建
2.1 创建一个公共maven项目
我们创建一个maven
项目:service-bulletcommon
。先看下最终的项目架构:
1.pom
依赖添加一些常用的工具:
<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.2</version></dependency>
</dependencies>
2.创建一个常量定义类SocketConstants
:
/*** @author Zong0915* @date 2022/12/15 下午3:59*/
public class SocketConstants {/*** 这条消息是否处理过*/public static final String CORRELATION_SET_PRE = "Correlation_Set_";/*** 同一个房间里面有哪些SessionID*/public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";public static final String MESSAGE = "message";public static final String ID = "id";/*** 原始消息所在队列*/public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";/*** 广播队列A*/public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";/*** 广播队列B*/public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";/*** 弹幕预处理交换机*/public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";/*** 弹幕广播交换机*/public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";/*** 弹幕预处理路由Key*/public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}
3.创建一个消息传输体OriginMessage
:
import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午2:07*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}
2.2 弹幕服务项目创建
1.我们创建一个maven
项目:service-bulletscreen
。先看下最终的项目架构:
1.pom
文件:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.1.RELEASE</version><exclusions><exclusion><artifactId>archaius-core</artifactId><groupId>com.netflix.archaius</groupId></exclusion><exclusion><artifactId>commons-io</artifactId><groupId>commons-io</groupId></exclusion><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>fastjson</artifactId><groupId>com.alibaba</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>httpclient</artifactId><groupId>org.apache.httpcomponents</groupId></exclusion><exclusion><artifactId>servo-core</artifactId><groupId>com.netflix.servo</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.6.7</version><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>
2.application.properties
:
spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848
3.bootstrap.yml
文件:
server:port: 83spring:application:name: tv-service-bulletscreenredis:database: 0 # Redis数据库索引(默认为0)host: 你的Redis地址 # Redis的服务地址port: 6379 # Redis的服务端口password: 密码jedis:pool:max-active: 8 # 连接池最大连接数(使用负值表示没有限制)max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)max-idle: 8 # 连接池中的最大空闲连接min-idle: 0 # 连接池中的最小空闲链接timeout: 30000 # 连接池的超时时间(毫秒)cloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672
4.Redis
配置类RedisConfig
:
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {/*** 实例化 RedisTemplate 对象** @return*/@Beanpublic RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();initDomainRedisTemplate(redisTemplate, redisConnectionFactory);return redisTemplate;}/*** 设置数据存入 redis 的序列化方式,并开启事务** @param redisTemplate* @param factory*/private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {//如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 开启事务redisTemplate.setEnableTransactionSupport(true);redisTemplate.setConnectionFactory(factory);}@Bean@ConditionalOnMissingBean(StringRedisTemplate.class)public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}
}
2.2.1 创建队列和广播型交换机
创建一个广播模式的交换机bulletFanOut-exchange
:其实用direct
也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
分别为两个Socket
服务创建个队列,用来接收处理好的消息(练习下广播模式):
bulletSocket-queueA
bulletSocket-queueB
再分别为他们和上述创建好的交换机进行绑定。
我们的弹幕服务主要做两件事:
- 监听预处理队列,数据来自:
originBullet-queue
。 - 将处理完的消息通过广播,发送给
bulletSocket-queueA/B
两个队列。
RabbitMQ
配置类如下:
import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);}@Beanpublic Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);}@Beanpublic Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);}@Bean("fanoutExchange")FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);}@BeanBinding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);}@BeanBinding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);}
}
2.2.2 生产者发送最终弹幕数据
创建FanoutMessageProducer
类:记得向我们上面绑定的广播交换机发送数据。
import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author Zong0915* @date 2022/12/15 下午2:51*/
@Component
public class FanoutMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称"",// 路由Keymap, correlationData);}
}
2.2.3 消费者监听原始弹幕数据
创建OriginMessageConsumer
类:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/**- @author Zong0915- @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class OriginMessageConsumer {@Autowiredprivate BulletScreenService bulletScreenService;/*** 处理原始消息** @param testMessage Map类型的消息体* @param headers 消息头* @param channel 消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "originBullet-queue", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),key = "bullet.originMessage"))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,Channel channel) throws IOException {log.info("***********消费开始*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));bulletScreenService.processMessage(testMessage, headers, channel);}
}
2.创建BulletScreenService
类用于原始弹幕的业务处理,主要考虑的几个点:
- 消息的合法性校验。
- 消息的幂等性保证,这里用了
Redis
做个存储。 - 将原始数据处理完后,在丢给
MQ
进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Service
@Slf4j
public class BulletScreenService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate FanoutMessageProducer fanoutMessageProducer;@Asyncpublic void processMessage(Map testMessage, Map<String, Object> headers,Channel channel) throws IOException {OriginMessage originMessage = getOriginMessage(testMessage);// 合法性校验if (!validMessage(testMessage, headers, originMessage)) {return;}// 处理消息log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());String correlationId = headers.get(SocketConstants.ID).toString();// 存入Redis并设置过期时间1天redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);// 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装// 本案例就不做处理了,原样返回fanoutMessageProducer.send(originMessage);// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}/*** 对消息进行合法性校验*/public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {// 判空if (testMessage == null || testMessage.size() == 0 || originMessage == null) {return false;}if (headers == null || headers.size() == 0) {return false;}// 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉UUID correlationId = (UUID) headers.get(SocketConstants.ID);Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());return !Optional.ofNullable(exist).orElse(false);}
}
最后就是启动类BulletScreenApplication
:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;/*** @author Zong0915* @date 2022/12/10 下午9:44*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {public static void main(String[] args) {SpringApplication.run(BulletScreenApplication.class, args);}
}
2.3 Socket服务监听弹幕数据并返回前端
记得在pom
依赖中引入上面的公共包:
<dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
2.3.1 配置类
RabbitMQ
配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA
@Bean
public Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}
另一个Socket
项目,添加以下配置:绑定bulletSocket-queueB
@Bean
public Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}
再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:
public class SocketCache {public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {ArrayList<BulletScreenServer> res = new ArrayList<>();if (StringUtils.isBlank(roomId)) {return res;}for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();if (map == null || map.size() == 0) {continue;}for (BulletScreenServer server : map.values()) {if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {res.add(server);}}}return res;}
}
2.3.2 消费者
重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer
类:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;
import java.util.Map;/*** @author Zong0915* @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class FanOutMessageConsumer {/*** 处理弹幕消息,开始广播** @param testMessage Map类型的消息体* @param headers 消息头* @param channel 消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "bulletSocket-queueA", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("***********消费开始, Socket服务A接收到广播消息*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));OriginMessage originMessage = getOriginMessage(testMessage);if (originMessage == null) {return;}// 根据roomID去找到同一个直播间下的所有用户并广播消息List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));}// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}
}
另一个Socket
服务则改一下消费者的监听队列和日志内容即可:
2.4 测试
打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
此时Socket
服务A:
Socket
服务B:
页面A中随便发送一条弹幕:
页面B中随便发送一条弹幕:
1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
2.service-bulletscreen
服务,监听到预处理队列数据,开始进行处理。
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
4.Socket
服务B接收到消息:
Socket
服务A
接收到广播消息:
5.前端页面展示:
页面A:
页面B:
到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis
缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。