直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

news/2024/11/29 20:41:39/

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

  • 前言
  • 一. Socket服务整合RabbitMQ
  • 二. 弹幕服务创建
    • 2.1 创建一个公共maven项目
    • 2.2 弹幕服务项目创建
      • 2.2.1 创建队列和广播型交换机
      • 2.2.2 生产者发送最终弹幕数据
      • 2.2.3 消费者监听原始弹幕数据
    • 2.3 Socket服务监听弹幕数据并返回前端
      • 2.3.1 配置类
      • 2.3.2 消费者
    • 2.4 测试

前言

上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。

一. Socket服务整合RabbitMQ

我们页面上,通过WebSocket发送弹幕信息的时候,后端通过@OnMessage注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket服务它只负责消息的传递和WebSocket信息的维护,业务逻辑啥也不做。

1.添加pom依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件bootstrap.yml,添加RabbitMQ相关配置

server:port: 81spring:application:name: tv-service-socketcloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672

3.RabbitMQ配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue("originBullet-queue", true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange("bulletPreProcessor-exchange", true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");}
}

4.写一个简单的消息体OriginMessage,发送到MQ的:

import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午1:30*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}

5.MQ生产者OriginMessageSender

/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Component
public class OriginMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));// 发送给消息预处理队列rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称"bullet.originMessage",// 路由Keymap, correlationData);}
}

6.我们再对WebSocket的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API即可。只需要注意一下多例下属性的注入方式是怎么写的即可

import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {/*** 多例模式下的赋值方式*/private static OriginMessageSender originMessageSender;/*** 多例模式下的赋值方式*/@Autowiredprivate void setOriginMessageSender(OriginMessageSender originMessageSender) {BulletScreenServer.originMessageSender = originMessageSender;}private static final AtomicLong count = new AtomicLong(0);private Session session;private String sessionId;private String userId;private String roomId;/*** 打开连接* @param session* @OnOpen 连接成功后会自动调用该方法*/@OnOpenpublic void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用trycount.incrementAndGet();log.info("*************WebSocket连接次数: {} *************", count.longValue());this.userId = userId;this.roomId = roomId;// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;SocketCache.put(sessionId, this);}/*** 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接*/@OnClosepublic void closeConnection() {SocketCache.remove(sessionId);}/*** 客户端发送消息给服务端* @param message*/@OnMessagepublic void onMessage(String message) {if (StringUtils.isBlank(message)) {return;}// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的originMessageSender.send(buildMessage(message));}private OriginMessage buildMessage(String message) {OriginMessage originMessage = new OriginMessage();originMessage.setMessage(message);originMessage.setRoomId(roomId);originMessage.setSessionId(sessionId);originMessage.setUserId(userId);return originMessage;}
}

备注:记得将另一个Socket项目也改造成同样的代码。

二. 弹幕服务创建

2.1 创建一个公共maven项目

我们创建一个maven项目:service-bulletcommon。先看下最终的项目架构:
在这里插入图片描述

1.pom依赖添加一些常用的工具:

<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.2</version></dependency>
</dependencies>

2.创建一个常量定义类SocketConstants

/*** @author Zong0915* @date 2022/12/15 下午3:59*/
public class SocketConstants {/*** 这条消息是否处理过*/public static final String CORRELATION_SET_PRE = "Correlation_Set_";/*** 同一个房间里面有哪些SessionID*/public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";public static final String MESSAGE = "message";public static final String ID = "id";/*** 原始消息所在队列*/public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";/*** 广播队列A*/public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";/*** 广播队列B*/public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";/*** 弹幕预处理交换机*/public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";/*** 弹幕广播交换机*/public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";/*** 弹幕预处理路由Key*/public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}

3.创建一个消息传输体OriginMessage

import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午2:07*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}

2.2 弹幕服务项目创建

1.我们创建一个maven项目:service-bulletscreen。先看下最终的项目架构:
在这里插入图片描述

1.pom文件:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.1.RELEASE</version><exclusions><exclusion><artifactId>archaius-core</artifactId><groupId>com.netflix.archaius</groupId></exclusion><exclusion><artifactId>commons-io</artifactId><groupId>commons-io</groupId></exclusion><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>fastjson</artifactId><groupId>com.alibaba</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>httpclient</artifactId><groupId>org.apache.httpcomponents</groupId></exclusion><exclusion><artifactId>servo-core</artifactId><groupId>com.netflix.servo</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.6.7</version><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

2.application.properties

spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848

3.bootstrap.yml文件:

server:port: 83spring:application:name: tv-service-bulletscreenredis:database: 0 # Redis数据库索引(默认为0)host: 你的Redis地址 # Redis的服务地址port: 6379 # Redis的服务端口password: 密码jedis:pool:max-active: 8 # 连接池最大连接数(使用负值表示没有限制)max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)max-idle: 8 # 连接池中的最大空闲连接min-idle: 0 # 连接池中的最小空闲链接timeout: 30000 # 连接池的超时时间(毫秒)cloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672

4.Redis配置类RedisConfig

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {/*** 实例化 RedisTemplate 对象** @return*/@Beanpublic RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();initDomainRedisTemplate(redisTemplate, redisConnectionFactory);return redisTemplate;}/*** 设置数据存入 redis 的序列化方式,并开启事务** @param redisTemplate* @param factory*/private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {//如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 开启事务redisTemplate.setEnableTransactionSupport(true);redisTemplate.setConnectionFactory(factory);}@Bean@ConditionalOnMissingBean(StringRedisTemplate.class)public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}
}

2.2.1 创建队列和广播型交换机

创建一个广播模式的交换机bulletFanOut-exchange:其实用direct也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
在这里插入图片描述

分别为两个Socket服务创建个队列,用来接收处理好的消息(练习下广播模式):

  • bulletSocket-queueA
  • bulletSocket-queueB

再分别为他们和上述创建好的交换机进行绑定。
在这里插入图片描述

我们的弹幕服务主要做两件事:

  • 监听预处理队列,数据来自:originBullet-queue
  • 将处理完的消息通过广播,发送给bulletSocket-queueA/B两个队列。

RabbitMQ配置类如下:

import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);}@Beanpublic Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);}@Beanpublic Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);}@Bean("fanoutExchange")FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);}@BeanBinding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);}@BeanBinding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);}
}

2.2.2 生产者发送最终弹幕数据

创建FanoutMessageProducer类:记得向我们上面绑定的广播交换机发送数据。

import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author Zong0915* @date 2022/12/15 下午2:51*/
@Component
public class FanoutMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称"",// 路由Keymap, correlationData);}
}

2.2.3 消费者监听原始弹幕数据

创建OriginMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/**- @author Zong0915- @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class OriginMessageConsumer {@Autowiredprivate BulletScreenService bulletScreenService;/*** 处理原始消息** @param testMessage Map类型的消息体* @param headers     消息头* @param channel     消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "originBullet-queue", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),key = "bullet.originMessage"))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,Channel channel) throws IOException {log.info("***********消费开始*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));bulletScreenService.processMessage(testMessage, headers, channel);}
}

2.创建BulletScreenService类用于原始弹幕的业务处理,主要考虑的几个点:

  • 消息的合法性校验。
  • 消息的幂等性保证,这里用了Redis做个存储。
  • 将原始数据处理完后,在丢给MQ进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Service
@Slf4j
public class BulletScreenService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate FanoutMessageProducer fanoutMessageProducer;@Asyncpublic void processMessage(Map testMessage, Map<String, Object> headers,Channel channel) throws IOException {OriginMessage originMessage = getOriginMessage(testMessage);// 合法性校验if (!validMessage(testMessage, headers, originMessage)) {return;}// 处理消息log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());String correlationId = headers.get(SocketConstants.ID).toString();// 存入Redis并设置过期时间1天redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);// 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装// 本案例就不做处理了,原样返回fanoutMessageProducer.send(originMessage);// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}/*** 对消息进行合法性校验*/public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {// 判空if (testMessage == null || testMessage.size() == 0 || originMessage == null) {return false;}if (headers == null || headers.size() == 0) {return false;}// 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉UUID correlationId = (UUID) headers.get(SocketConstants.ID);Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());return !Optional.ofNullable(exist).orElse(false);}
}

最后就是启动类BulletScreenApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;/*** @author Zong0915* @date 2022/12/10 下午9:44*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {public static void main(String[] args) {SpringApplication.run(BulletScreenApplication.class, args);}
}

2.3 Socket服务监听弹幕数据并返回前端

记得在pom依赖中引入上面的公共包:

<dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

2.3.1 配置类

RabbitMQ配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA

@Bean
public Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}

另一个Socket项目,添加以下配置:绑定bulletSocket-queueB

@Bean
public Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}

再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:

public class SocketCache {public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {ArrayList<BulletScreenServer> res = new ArrayList<>();if (StringUtils.isBlank(roomId)) {return res;}for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();if (map == null || map.size() == 0) {continue;}for (BulletScreenServer server : map.values()) {if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {res.add(server);}}}return res;}
}

2.3.2 消费者

重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;
import java.util.Map;/*** @author Zong0915* @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class FanOutMessageConsumer {/*** 处理弹幕消息,开始广播** @param testMessage Map类型的消息体* @param headers     消息头* @param channel     消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "bulletSocket-queueA", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("***********消费开始, Socket服务A接收到广播消息*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));OriginMessage originMessage = getOriginMessage(testMessage);if (originMessage == null) {return;}// 根据roomID去找到同一个直播间下的所有用户并广播消息List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));}// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}
}

另一个Socket服务则改一下消费者的监听队列和日志内容即可:

在这里插入图片描述

2.4 测试

打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
在这里插入图片描述
此时Socket服务A:
在这里插入图片描述

Socket服务B:
在这里插入图片描述
页面A中随便发送一条弹幕:
在这里插入图片描述
页面B中随便发送一条弹幕:
在这里插入图片描述

1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
在这里插入图片描述
2.service-bulletscreen服务,监听到预处理队列数据,开始进行处理。

在这里插入图片描述
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
在这里插入图片描述
4.Socket服务B接收到消息:
在这里插入图片描述

Socket服务A接收到广播消息:
在这里插入图片描述

5.前端页面展示:

页面A:
在这里插入图片描述

页面B:
在这里插入图片描述

到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。


http://www.ppmy.cn/news/3955.html

相关文章

Jmeter简单入门

背景 我们项目中一般测试接口都是用vscode中的REST Client插件&#xff08;推荐好用&#xff09;或者的话postman(适合写一些脚本和文件的上传) 但是他们都有一个不太行的功能&#xff0c;那就是多线程并发测试&#xff0c;其他市面上的什么apipost也都是不支持&#xff0c;网…

MySQL——count(*)的底层实现以及相关优化

在开发系统的时候&#xff0c;可能需要需要计算一个表的行数这时候你可能会想&#xff0c;一条 select count(*) from t 语句不就解决了吗&#xff1f; 但是&#xff0c;会发现随着系统中记录数越来越多&#xff0c;这条语句执行得也会越来越慢。然后可能就想了&#xff0c;My…

Qt新手入门指南——创建一个基于Qt Widget的文本查找器(二)

Qt是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本教程将介绍如何使用…

frp内网穿透https

在公网服务器搭建frps(service)&#xff0c;在内网本地机子搭建frpc(client)&#xff0c;流量通过访问公网ip&#xff0c;经过frps服务端转发到fprc客户端&#xff0c;fprc再转发到本地web应用。 官方下载地址​ https://github.com/fatedier/frp/releases 官方文档地址https…

[山东科技大学OJ]2618 Problem E: 截取字符串

Time Limit: 1 Sec Memory Limit: 2 MB Submit: 1910 Solved: 360 [Submit][Status] Description 对给定的一个字符串&#xff0c;截取其中一部分输出。 Input 输入为两行&#xff0c;第一行为一个字符串s&#xff0c;长度至少为1且不超过20个字符&#xff1b;第二行为两个…

蓝桥杯嵌入式 cubeMX生成代码解读

文章目录前言一、代码风格二、为什么要这些注释&#xff1f;三、生成的独立模块的代码总结前言 本篇文章讲介绍一下cubeMX生成代码的风格。 一、代码风格 在main.c中可以看到非常多的注释代码&#xff0c;很多人都不知道这些是用来干嘛的&#xff0c;现在就给大家介绍一下这…

朴素二进制表示法

思路方案 在安全领域的研究中我们发现&#xff0c;很多数据预处理的步骤&#xff0c;在不同的场景下中都可以相互 借鉴&#xff0c;甚至可以进行直接复用。例如&#xff0c;对于加密流量相关的数据&#xff0c;当算法工程师 获取到一批加密流量的 pcap 包之后&#xff0c;不论他…

先聊聊「内存分配」,再聊聊Go的「逃逸分析」。

前言 大家好&#xff0c;我是阳哥。 今天和大家聊聊Go语言的「内存分配」和「逃逸分析」。 这期内容不仅有文档&#xff0c;而且有视频&#xff1a; # Go语言的内存分配和逃逸分析-理论篇 # Go语言的内存分配和逃逸分析-实践总结篇 要搞清楚GO的逃逸分析一定要先搞清楚内…