【使用Apache Flink 实现滑动窗口流式计算】

news/2025/2/3 17:52:03/

什么是Flink?

Apache Flink是一个用于分布式流式处理和批处理的开源实时计算引擎。它具备低延迟、高吞吐量和 exactly-once 语义的特点,适用于各种实时数据处理场景。

Flink的核心概念

作业(Job):Flink程序的执行单元。
数据流(DataStream):表示连续的数据流,可以进行转换和计算。
窗口(Window):用于对无限数据流进行有界的数据切片处理。
状态(State):用于保存和管理中间计算结果。
时间语义(Event Time、Processing Time、Ingestion Time):用于确定事件发生的时间。

我在 实时攻击行为分析模块 中,使用 Apache Flink 的滑动窗口流式计算如下:

1. 滑动窗口定义与数据源接入
  • 数据源

    • 攻防引擎产生的攻击日志通过 Kafka 实时推送,每条日志包含字段:攻击时间戳、攻击类型(如 SQL 注入、XSS)、攻击结果(成功/失败)、目标 IP、攻击者 ID 等。
    • Flink 通过 FlinkKafkaConsumer 订阅 Kafka 的 attack-log Topic,反序列化为 AttackEvent 对象。
  • 窗口配置

    • 窗口类型:滑动窗口(Sliding Window),窗口大小 5秒,滑动间隔 1秒(每 1 秒输出一次过去 5 秒的统计结果)。
    • 时间语义:采用 Event Time(以攻击日志中的时间戳为准),通过 Watermark 处理乱序事件(允许最大延迟 2 秒)。
DataStream<AttackEvent> attackStream = env  .addSource(new FlinkKafkaConsumer<>("attack-log", new AttackEventDeserializer(), properties))  .assignTimestampsAndWatermarks(WatermarkStrategy  .<AttackEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))  .withTimestampAssigner((event, timestamp) -> event.getTimestamp()));  // 按攻击类型分组,开滑动窗口  
DataStream<AttackStat> windowedStat = attackStream  .keyBy(AttackEvent::getAttackType)  .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))  .aggregate(new AttackStatAggregator());  

2. 攻击成功率计算逻辑
  • 聚合逻辑
    • 在窗口内统计每种攻击类型的 成功次数总次数,计算成功率。
    • 自定义 AggregateFunction 实现累加器(保存中间状态)与合并逻辑:
public class AttackStatAggregator implements AggregateFunction<AttackEvent, AttackStatAccumulator, AttackStat> {  @Override  public AttackStatAccumulator createAccumulator() {  return new AttackStatAccumulator(0, 0);  }  @Override  public AttackStatAccumulator add(AttackEvent event, AttackStatAccumulator acc) {  acc.totalCount++;  if (event.isSuccess()) acc.successCount++;  return acc;  }  @Override  public AttackStat getResult(AttackStatAccumulator acc) {  double successRate = (acc.totalCount == 0) ? 0 : (double) acc.successCount / acc.totalCount;  return new AttackStat(acc.totalCount, successRate);  }  @Override  public AttackStatAccumulator merge(AttackStatAccumulator a, AttackStatAccumulator b) {  return new AttackStatAccumulator(a.totalCount + b.totalCount, a.successCount + b.successCount);  }  
}  

3. 实时指标存储与可视化
  • 写入 Redis 排行榜

    • 攻击成功率计算结果通过 RedisSink 写入 Redis Sorted Set,按成功率从高到低排序,供前端实时展示 Top 10 攻击类型:
    // Flink 输出到 Redis  
    DataStream<AttackStat> windowedStat = ...;  
    windowedStat.addSink(new RedisSink<>(  "attack-success-rate",  (stat, jedis) -> jedis.zadd("attack_rank", stat.getSuccessRate(), stat.getAttackType())  
    ));  
    
  • 持久化到 Apache Doris

    • 原始攻击日志与统计结果通过 Flink JDBC Connector 写入 Doris 的 attack_log 表与 attack_stat 表,支持 OLAP 分析(如按时间维度聚合攻击趋势)。
    • Doris 表设计示例:
      -- 攻击日志表(明细数据)  
      CREATE TABLE attack_log (  event_time DATETIME,  attack_type VARCHAR(32),  is_success BOOLEAN,  src_ip VARCHAR(15),  target_ip VARCHAR(15)  
      ) DUPLICATE KEY(event_time)  
      DISTRIBUTED BY HASH(src_ip) BUCKETS 10;  -- 攻击统计表(聚合数据)  
      CREATE TABLE attack_stat (  window_start DATETIME,  window_end DATETIME,  attack_type VARCHAR(32),  success_rate DOUBLE  
      ) UNIQUE KEY(window_start, attack_type)  
      DISTRIBUTED BY HASH(window_start) BUCKETS 5;  
      
  • 可视化看板

    • 前端通过 ECharts 监听 WebSocket 服务,实时拉取 Redis 中的排行榜数据,并绘制热力图(基于攻击目标 IP 的地理位置分布)。

4. 性能优化与容错机制
  • Flink 配置优化

    • 并行度调优:根据 Kafka Topic 分区数设置 Flink 并行度(如 8 并行度),避免数据倾斜。
    • 状态后端:使用 RocksDB 作为状态后端,支持大状态持久化与故障恢复。
    • Checkpoint 配置:每 30 秒触发一次 Checkpoint,启用 Exactly-Once 语义。
  • Redis 写入优化

    • 使用 Pipeline 批量写入,减少网络开销,提升吞吐量。
    • 设置 Key 的 TTL(如 1 小时),避免内存无限增长。

5. 实际挑战与解决方案
  • 问题 1:窗口延迟导致数据不完整

    • 现象:部分攻击日志因网络延迟超过 Watermark 允许的 2 秒,导致窗口关闭后数据被丢弃。
    • 解决方案
      • 开启 Flink 的 Allowed Lateness(允许额外延迟 5 秒),并注册 SideOutput 捕获迟到数据,补充更新统计结果:
      OutputTag<AttackEvent> lateDataTag = new OutputTag<>("late-data"){};  
      DataStream<AttackStat> windowedStat = attackStream  .keyBy(...)  .window(...)  .allowedLateness(Time.seconds(5))  .sideOutputLateData(lateDataTag)  .aggregate(...);  // 处理迟到数据  
      DataStream<AttackEvent> lateData = windowedStat.getSideOutput(lateDataTag);  
      lateData.addSink(...); // 如写入 Doris 供离线补偿  
      
  • 问题 2:Doris 写入瓶颈

    • 现象:高峰时段 Flink 批量写入 Doris 导致 CPU 占用率过高。
    • 解决方案
      • 调整 Doris 的 Batch Insert 参数(如 exec_mem_limitload_parallelism);
      • 启用 Stream Load 替代 JDBC,提升写入效率。

总结

通过上述实现,Flink 滑动窗口流式计算模块实现了以下价值:

  1. 实时性:攻击成功率指标延迟从 0.5 小时降至 4 秒,满足攻防演练的实时决策需求;
  2. 可扩展性:基于 Kafka + Flink + Doris 的架构,可横向扩展支撑日均千万级攻击日志;
  3. 业务价值:实时看板成为客户演练复盘的核心工具,推动产品续约率提升 20%。

参考文章

Apache flink官方文档
阿里云技术文档


http://www.ppmy.cn/news/1569020.html

相关文章

四、jQuery笔记

(一)jQuery概述 jQuery本身是js的一个轻量级的库,封装了一个对象jQuery,jquery的所有语法都在jQuery对象中 浏览器不认识jquery,只渲染html、css和js代码,需要先导入jQuery文件,官网下载即可 jQuery中文说明文档:https://hemin.cn/jq/ (二)jQuery要点 1、jQuery对象 …

Josephus Problem II CSES - 2163

有3种方法 Solution 1 - ordered_set Utilizing the ordered_set This data structure is an extension of the general set in C. It allows searching for the K-th smallest element in O(log n) time complexity. #include <iostream> using namespace std; #…

【apt源】RK3588 平台ubuntu20.04更换apt源

RK3588芯片使用的是aarch64架构&#xff0c;因此在Ubuntu 20.04上更换apt源时需要使用针对aarch64架构的源地址。以下是针对RK3588芯片在Ubuntu 20.04上更换apt源到清华源的正确步骤&#xff1a; 步骤一&#xff1a;打开终端 在Ubuntu 20.04中&#xff0c;按下Ctrl Alt T打…

智能小区物业管理系统推动数字化转型与提升用户居住体验

内容概要 在当今快速发展的社会中&#xff0c;智能小区物业管理系统的出现正在改变传统的物业管理方式。这种系统不仅仅是一种工具&#xff0c;更是一种推动数字化转型的重要力量。它通过高效的技术手段&#xff0c;将物业管理与用户居住体验紧密结合&#xff0c;无疑为社区带…

ChatGPT 搜索测试整合记忆功能

据 TestingCatalog 报道&#xff0c;OpenAI 正在测试 ChatGPT 搜索的整合记忆功能&#xff0c;被命名为 “Memory in search”2。以下是关于该功能的具体情况123&#xff1a; 功能特点 个性化搜索&#xff1a;启用该功能后&#xff0c;ChatGPT 能利用存储的记忆数据&#xff0…

【4】阿里面试题整理

[1]. 介绍一下数据库死锁 数据库死锁是指两个或多个事务&#xff0c;由于互相请求对方持有的资源而造成的互相等待的状态&#xff0c;导致它们都无法继续执行。 死锁会导致事务阻塞&#xff0c;系统性能下降甚至应用崩溃。 比如&#xff1a;事务T1持有资源R1并等待R2&#x…

【AI】探索自然语言处理(NLP):从基础到前沿技术及代码实践

Hi &#xff01; 云边有个稻草人-CSDN博客 必须有为成功付出代价的决心&#xff0c;然后想办法付出这个代价。 目录 引言 1. 什么是自然语言处理&#xff08;NLP&#xff09;&#xff1f; 2. NLP的基础技术 2.1 词袋模型&#xff08;Bag-of-Words&#xff0c;BoW&#xff…

LLM:BERT or BART 之BERT

文章目录 前言一、BERT1. Decoder-only2. Encoder-only3. Use of Bidirectional Context4. Masked Language Model (MLM)5. Next Sentence Prediction (NSP)6. Fine-tune1、情感分析2、句对分析3、命名实体识别&#xff08;NER&#xff09; 7. BERT总结 总结 前言 NLP选手对这…