提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

devtools/2025/3/3 22:08:34/

提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具,专为解决大数据流处理场景中的缓冲与批量处理问题。本文将详细讲解如何在 Spring Boot 项目中使用 BufferTrigger,帮助你提高系统的吞吐量与响应速度,减少 I/O 操作,从而提升整体性能。

BufferTrigger 简介:如何高效处理实时数据流

快手开源的 BufferTrigger 是一个用于数据处理,它主要用于实时数据流处理场景。BufferTrigger 的主要作用是为了解决在大数据流处理中常见的问题:如何高效地对连续的数据流进行缓冲,并在满足一定条件时触发下游计算或存储操作。

使用 BufferTrigger 优势如下:

  1. 提高效率:通过批量处理数据而不是逐条处理,可以显著减少 I/O 操作的次数,从而提升整体处理效率。
  2. 资源优化:对于一些需要消耗较多计算资源的操作(如写入数据库、调用外部服务等),通过累积一批数据后再执行一次这样的操作,可以更有效地利用系统资源。
  3. 简化逻辑:对于开发者而言,使用 BufferTrigger 可以帮助简化代码逻辑,将注意力集中在业务逻辑上而不是复杂的缓冲控制逻辑上。
  4. 灵活配置:支持多种触发策略(比如基于时间窗口、基于数据量大小等),使得用户可以根据具体应用场景灵活选择最合适的触发方式。
  5. 易于集成:设计上考虑了与现有数据处理框架的良好兼容性,使得它可以方便地与其他组件一起工作,在现有的技术栈中引入该功能变得更加容易。

如何添加依赖:快速集成到 Spring Boot 项目

只需要在 pom.xml 中添加以下依赖,即可将 BufferTrigger 集成到你的 Spring Boot 项目中:

 <properties>// 省略...<buffertrigger.version>0.2.21</buffertrigger.version></properties><!-- 统一依赖管理 --><dependencyManagement><dependencies>// 省略...<!-- 快手 Buffer Trigger --><dependency><groupId>com.github.phantomthief</groupId><artifactId>buffer-trigger</artifactId><version>${buffertrigger.version}</version></dependency></dependencies></dependencyManagement>

快手 BufferTrigger 使用讲解

  • 核心概念
    1. 缓冲队列:BufferTrigger 会维护一个内部缓冲区,用来缓存从外部接收的数据。它允许指定缓存队列的最大容量,当达到上限时会根据预设的触发策略进行数据的批量处理。
    2. 触发策略:触发策略是指何时将缓存的数据批量提交进行处理。常见的触发策略有:
      • 基于数据量:当缓存的数据达到指定大小时触发处理。
      • 基于时间窗口:每隔一定时间就触发一次处理。
      • 混合触发:同时满足数据量和时间条件时触发。
    3. 数据消费:通过 BufferTrigger 提供的消费者回调机制,开发者可以自定义数据消费的逻辑。一般情况下,消费的过程是对缓存的数据进行处理、存储或其他操作。
    4. 批量处理:将一批数据聚合后一起处理,而不是一条一条地处理。这样能够减少 I/O 操作的次数,从而提高系统的吞吐量。

使用案例

在许多社交平台上,网红或明星的粉丝数通常会发生频繁的波动。比如,当一个网红被大量用户关注或取消关注时,这些信息会通过消息队列(如 RocketMQ)快速传递,系统需要高效地处理这些变化,并更新到缓存或数据库中。在这种场景下,如果每次有粉丝关注或取消关注时都进行一次 I/O 操作,会导致系统的负载过大,尤其是在并发请求较高时。

为了提高系统的性能,减少频繁的 I/O 操作,通常采用 批量处理 的方式来对消息进行合并和延迟处理。BufferTrigger 就是为了应对这种高并发和实时性要求的场景,它能够将多条消息缓存起来,当满足触发条件时(比如缓存队列达到一定大小或时间窗口到期),将这些消息批量处理,从而减少与缓存系统的交互次数,提升系统的吞吐量和响应速度。

java">@Component
@RocketMQMessageListener(consumerGroup = "xiaohashu_group_" + MQConstants.TOPIC_COUNT_FANS, // Group 组topic = MQConstants.TOPIC_COUNT_FANS // 主题 Topic
)
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> {@Resourceprivate RedisTemplate<String, Object> redisTemplate;private BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking().bufferSize(50000) // 缓存队列的最大容量.batchSize(1000)   // 一批次最多聚合 1000 条.linger(Duration.ofSeconds(1)) // 多久聚合一次.setConsumerEx(this::consumeMessage).build();@Resourceprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void onMessage(String body) {// 往 bufferTrigger 中添加元素bufferTrigger.enqueue(body);}private void consumeMessage(List<String> bodys) {log.info("==> 聚合消息, size: {}", bodys.size());log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));// List<String> 转 List<CountFollowUnfollowMqDTO>List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream().map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();//按照用户进行一个分组Map<Long, List<CountFollowUnfollowMqDTO>> groupMap  =countFollowUnfollowMqDTOS.stream().collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getUserId));// 按组汇总数据,统计出最终的计数// key 为目标用户ID, value 为最终操作的计数Map<Long, Integer> countMap = Maps.newHashMap();for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {List<CountFollowUnfollowMqDTO> list = entry.getValue();// 最终的计数值,默认为 0int finalCount = 0;for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {Integer type = countFollowUnfollowMqDTO.getType();FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);// 若枚举为空,跳到下一次循环if (Objects.isNull(followUnfollowTypeEnum)) {continue;}switch (followUnfollowTypeEnum) {case FOLLOW -> finalCount += 1;case UNFOLLOW -> finalCount -= 1;}}// 将分组后统计出的最终计数,存入 countMap 中countMap.put(entry.getKey(), finalCount);}log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));// 更新 RediscountMap.forEach((k, v) -> {// Redis KeyString redisKey = RedisKeyConstants.buildCountUserKey(k);// 判断 Redis 中 Hash 是否存在boolean isExisted = redisTemplate.hasKey(redisKey);// 若存在才会更新// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)if (isExisted) {// 对目标用户 Hash 中的粉丝数字段进行计数操作redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);}});// 发送 MQ, 计数数据落库// 构建消息体 DTOMessage<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap)).build();// 异步发送 MQ 消息,提升接口响应速度rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("==> 【计数服务:粉丝数入库】MQ 发送成功,SendResult: {}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("==> 【计数服务:粉丝数入库】MQ 发送异常: ", throwable);}});}}

代码解析

  • BufferTrigger 的构建:通过 .batchBlocking() 创建一个 BufferTrigger 实例,该实例设置了缓存队列的最大容量、每批次最多处理的消息数量、以及聚合的时间窗口等配置。
  • enqueue(body):每接收到一条消息,就将消息加入到缓冲队列中,BufferTrigger 会根据设定的策略决定何时批量处理这些数据。
  • consumeMessage(List<String> bodys):当数据满足触发条件时(如缓存队列满或时间窗口到期),consumeMessage 会被调用,处理聚合后的数据。

http://www.ppmy.cn/devtools/164294.html

相关文章

React核心知识及使用场景

React是一个用于构建用户界面的JavaScript库,尤其适合构建单页面应用(SPA)。它基于组件化的开发思想,主要特点是通过虚拟DOM来提高渲染效率。以下是React的核心知识和使用场景: 一. 核心知识 组件化: 类组件和函数组件:React的组件分为类组件和函数组件。类组件通过继承…

3.jvm的执行流程

自上向下 ​编译&#xff1a;.java → .class。 ​加载&#xff1a;类加载器加载字节码到方法区&#xff0c;生成Class对象。 ​内存分配&#xff1a;对象实例存入堆&#xff0c;方法调用栈帧入虚拟机栈。 ​执行&#xff1a;解释器或JIT执行字节码&#xff0c;本地方法通过JNI…

零知识证明在区块链加密货币中的应用分析

目录 1引言 2 研究内容 2.1 零知识认证算法模型 2.2 区块链中的加密货币算法 2.2.1 区块链中的零知识证明算法 2.2.2 算法设计 2.2.3 算法的实现 2.3 方案的安全性分析 2.4 方案的效率分析 3 结论 1引言 随着信息技术的迅速发展&#xff0c;数据隐私和安全性…

C++入门(2)

1.变量和常量 1.1变量的创建 我们把经常变化的值称为变量&#xff0c;不变的值称为常量。 data_type name;| || | 数据类型 变量名 1 int age; //整型变量 2 char ch; //字符变量 3 double weight; //浮点型变量 变量的命名规则遵循以下原则&#x…

Is Noise Conditioning Necessary for Denoising Generative Models?论文阅读笔记

很吸引人的一个标题&#xff0c;很吸引人的一个作者&#xff0c;来读一读明神的新作&#xff0c;讲的是怎么把去噪领域的一些有意思的思想&#xff0c;特别是blind denoising和noise-level estimation的思想&#xff0c;应用到denoising diffusion模型中&#xff0c;从而去掉de…

4-1.jvm的类加载

JVM的类加载机制是将字节码文件&#xff08;.class&#xff09;动态加载到内存&#xff0c;并进行验证、准备、解析和初始化的过程&#xff0c;最终生成可被虚拟机直接使用的类对象。&#xff1a; 一、类加载的五大阶段 ​加载&#xff08;Loading&#xff09;​​ ​任务&a…

Android双屏异显副屏实现PIP效果小窗口同步显示主屏播放画面

在KTV应用开发中一个常见的场景需求就是一台设备要接多个显示屏&#xff0c;其中一个主屏一般都是触摸屏&#xff0c;通过VGA线连接&#xff0c;支持手点击操作点歌切歌等。另外还会有多个副屏&#xff0c;一般都是电视机&#xff0c;通过HDMI线连接。 有一个特点就是所有电视…

2025国家护网HVV高频面试题总结来了01(题目+回答)

网络安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 0x1 高频面试题第一套 0x2 高频面试题第二套 0x3 高频面试题第三套 0x4 高频面试题第四套 0x5 高频面…