【使用Apache Flink 实现滑动窗口流式计算】

server/2025/2/6 23:05:46/

什么是Flink?

Apache Flink是一个用于分布式流式处理和批处理的开源实时计算引擎。它具备低延迟、高吞吐量和 exactly-once 语义的特点,适用于各种实时数据处理场景。

Flink的核心概念

作业(Job):Flink程序的执行单元。
数据流(DataStream):表示连续的数据流,可以进行转换和计算。
窗口(Window):用于对无限数据流进行有界的数据切片处理。
状态(State):用于保存和管理中间计算结果。
时间语义(Event Time、Processing Time、Ingestion Time):用于确定事件发生的时间。

我在 实时攻击行为分析模块 中,使用 Apache Flink 的滑动窗口流式计算如下:

1. 滑动窗口定义与数据源接入
  • 数据源

    • 攻防引擎产生的攻击日志通过 Kafka 实时推送,每条日志包含字段:攻击时间戳、攻击类型(如 SQL 注入、XSS)、攻击结果(成功/失败)、目标 IP、攻击者 ID 等。
    • Flink 通过 FlinkKafkaConsumer 订阅 Kafka 的 attack-log Topic,反序列化为 AttackEvent 对象。
  • 窗口配置

    • 窗口类型:滑动窗口(Sliding Window),窗口大小 5秒,滑动间隔 1秒(每 1 秒输出一次过去 5 秒的统计结果)。
    • 时间语义:采用 Event Time(以攻击日志中的时间戳为准),通过 Watermark 处理乱序事件(允许最大延迟 2 秒)。
DataStream<AttackEvent> attackStream = env  .addSource(new FlinkKafkaConsumer<>("attack-log", new AttackEventDeserializer(), properties))  .assignTimestampsAndWatermarks(WatermarkStrategy  .<AttackEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))  .withTimestampAssigner((event, timestamp) -> event.getTimestamp()));  // 按攻击类型分组,开滑动窗口  
DataStream<AttackStat> windowedStat = attackStream  .keyBy(AttackEvent::getAttackType)  .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))  .aggregate(new AttackStatAggregator());  

2. 攻击成功率计算逻辑
  • 聚合逻辑
    • 在窗口内统计每种攻击类型的 成功次数总次数,计算成功率。
    • 自定义 AggregateFunction 实现累加器(保存中间状态)与合并逻辑:
public class AttackStatAggregator implements AggregateFunction<AttackEvent, AttackStatAccumulator, AttackStat> {  @Override  public AttackStatAccumulator createAccumulator() {  return new AttackStatAccumulator(0, 0);  }  @Override  public AttackStatAccumulator add(AttackEvent event, AttackStatAccumulator acc) {  acc.totalCount++;  if (event.isSuccess()) acc.successCount++;  return acc;  }  @Override  public AttackStat getResult(AttackStatAccumulator acc) {  double successRate = (acc.totalCount == 0) ? 0 : (double) acc.successCount / acc.totalCount;  return new AttackStat(acc.totalCount, successRate);  }  @Override  public AttackStatAccumulator merge(AttackStatAccumulator a, AttackStatAccumulator b) {  return new AttackStatAccumulator(a.totalCount + b.totalCount, a.successCount + b.successCount);  }  
}  

3. 实时指标存储与可视化
  • 写入 Redis 排行榜

    • 攻击成功率计算结果通过 RedisSink 写入 Redis Sorted Set,按成功率从高到低排序,供前端实时展示 Top 10 攻击类型:
    // Flink 输出到 Redis  
    DataStream<AttackStat> windowedStat = ...;  
    windowedStat.addSink(new RedisSink<>(  "attack-success-rate",  (stat, jedis) -> jedis.zadd("attack_rank", stat.getSuccessRate(), stat.getAttackType())  
    ));  
    
  • 持久化到 Apache Doris

    • 原始攻击日志与统计结果通过 Flink JDBC Connector 写入 Doris 的 attack_log 表与 attack_stat 表,支持 OLAP 分析(如按时间维度聚合攻击趋势)。
    • Doris 表设计示例:
      -- 攻击日志表(明细数据)  
      CREATE TABLE attack_log (  event_time DATETIME,  attack_type VARCHAR(32),  is_success BOOLEAN,  src_ip VARCHAR(15),  target_ip VARCHAR(15)  
      ) DUPLICATE KEY(event_time)  
      DISTRIBUTED BY HASH(src_ip) BUCKETS 10;  -- 攻击统计表(聚合数据)  
      CREATE TABLE attack_stat (  window_start DATETIME,  window_end DATETIME,  attack_type VARCHAR(32),  success_rate DOUBLE  
      ) UNIQUE KEY(window_start, attack_type)  
      DISTRIBUTED BY HASH(window_start) BUCKETS 5;  
      
  • 可视化看板

    • 前端通过 ECharts 监听 WebSocket 服务,实时拉取 Redis 中的排行榜数据,并绘制热力图(基于攻击目标 IP 的地理位置分布)。

4. 性能优化与容错机制
  • Flink 配置优化

    • 并行度调优:根据 Kafka Topic 分区数设置 Flink 并行度(如 8 并行度),避免数据倾斜。
    • 状态后端:使用 RocksDB 作为状态后端,支持大状态持久化与故障恢复。
    • Checkpoint 配置:每 30 秒触发一次 Checkpoint,启用 Exactly-Once 语义。
  • Redis 写入优化

    • 使用 Pipeline 批量写入,减少网络开销,提升吞吐量。
    • 设置 Key 的 TTL(如 1 小时),避免内存无限增长。

5. 实际挑战与解决方案
  • 问题 1:窗口延迟导致数据不完整

    • 现象:部分攻击日志因网络延迟超过 Watermark 允许的 2 秒,导致窗口关闭后数据被丢弃。
    • 解决方案
      • 开启 Flink 的 Allowed Lateness(允许额外延迟 5 秒),并注册 SideOutput 捕获迟到数据,补充更新统计结果:
      OutputTag<AttackEvent> lateDataTag = new OutputTag<>("late-data"){};  
      DataStream<AttackStat> windowedStat = attackStream  .keyBy(...)  .window(...)  .allowedLateness(Time.seconds(5))  .sideOutputLateData(lateDataTag)  .aggregate(...);  // 处理迟到数据  
      DataStream<AttackEvent> lateData = windowedStat.getSideOutput(lateDataTag);  
      lateData.addSink(...); // 如写入 Doris 供离线补偿  
      
  • 问题 2:Doris 写入瓶颈

    • 现象:高峰时段 Flink 批量写入 Doris 导致 CPU 占用率过高。
    • 解决方案
      • 调整 Doris 的 Batch Insert 参数(如 exec_mem_limitload_parallelism);
      • 启用 Stream Load 替代 JDBC,提升写入效率。

总结

通过上述实现,Flink 滑动窗口流式计算模块实现了以下价值:

  1. 实时性:攻击成功率指标延迟从 0.5 小时降至 4 秒,满足攻防演练的实时决策需求;
  2. 可扩展性:基于 Kafka + Flink + Doris 的架构,可横向扩展支撑日均千万级攻击日志;
  3. 业务价值:实时看板成为客户演练复盘的核心工具,推动产品续约率提升 20%。

参考文章

Apache flink官方文档
阿里云技术文档


http://www.ppmy.cn/server/165526.html

相关文章

MySQL UNION 操作详解

MySQL UNION 操作详解 引言 在数据库操作中,UNION 是一个非常重要的概念,它允许我们在一个查询中合并多个 SELECT 语句的结果集。UNION 操作通常用于将来自不同表的数据合并在一起,或者将同一表中的数据按照不同的条件进行合并。本文将详细介绍 MySQL 中的 UNION 操作,包…

ollama部署deepseek实操记录

1. 安装 ollama 1.1 下载并安装 官网 https://ollama.com/ Linux安装命令 https://ollama.com/download/linux curl -fsSL https://ollama.com/install.sh | sh安装成功截图 3. 开放外网访问 1、首先停止ollama服务&#xff1a;systemctl stop ollama 2、修改ollama的servic…

试试DeepSeek写prompt+stable diffusion生成漫画

#deepseek #stable diffusion 模型&#xff1a;dreamshaperXL_v21TurboDPMSDE.safetensors 一、情节拟定 漫画情节由deepseek自编自导&#xff0c;画幅为四张。 Prompt 1: 魔法觉醒 "一个平凡的少年在阁楼发现一本古老的魔法书&#xff0c;书页散发着微弱的蓝光。画…

wordpress安装

安装WordPress安装包&#xff0c;解压缩&#xff0c;安装到指定位置安装php环境&#xff1a; 下载&#xff1a;sudo yum install php php-fpm php-mysqlnd安装&#xff1a;sudo systemctl start php-fpmsudo systemctl enable php-fpm检测&#xff1a;php -v查询状态&#xff1…

05. Springboot admin集成Actuator(一)

目录 1、前言 2、Actuator监控端点 2.1、健康检查 2.2、信息端点 2.3、环境信息 2.4、度量指标 2.5、日志文件查看 2.6、追踪信息 2.7、Beans信息 2.8、Mappings信息 3、快速使用 2.1、添加依赖 2.2、添加配置文件 2.3、启动程序 4、自定义端点Endpoint 5、自定…

18爬虫:关于playwright相关内容的学习

1.如何在python中安装playwright 打开pycharm&#xff0c;进入终端&#xff0c;输入如下的2个命令行代码即可自动完成playwright的安装 pip install playwright ——》在python中安装playwright第三方模块 playwright install ——》安装playwright所需的工具插件和所支持的…

Kafka的消息协议

引言 在学习MQTT消息协议的时候我常常思考kafka的消息协议是什么&#xff0c;怎么保证消息的可靠性和高性能传输的&#xff0c;接下来我们一同探究一下 Kafka 在不同的使用场景和组件交互中用到了多种协议&#xff0c;以下为你详细介绍&#xff1a; 内部通信协议 Kafka 使用…

对比DeepSeek、ChatGPT和Kimi的学术写作关键词提取能力

关键词 关键词主要从论文标题、摘要及正文中提炼出来&#xff0c;需要准确反映论文的核心主题和专业领域。关键词的选择不仅有助于标引人员进行主题词的选取、数据库的建立以及文献的检索&#xff0c;而且也便于读者高效检索和引用相关学术成果&#xff0c;从而促进学术交流的深…