Google Cloud dataflow streaming job简介

news/2025/2/12 21:01:36/

简单介绍

首先 gcp 的dataflow 是1个ETL 组件, 它是基于Apache beam的

Apache beam 是1个较新的开源ETL 框架。
对于我们常用的ETL tool Spring batch 有下面的区别

  1. spring batch 更偏向batch (后台处理)的ETL, 而apache beam 同时支持batch 和 streaming 的ETL, 对streaming 的ETL 有更好的支持
  2. spring batch 基于java,apache beam SDK 支持 java, python 和 GO
  3. spring batch 更加轻量级, 但是依赖于1个关系数据存储ETL job(配置, 历史记录)数据. 而且不需要开发人员去设置. 而 Apache Beam本身并没有内置的作业执行历史记录功能 , 这些数据需要自己去记录处理(在runner上)
  4. spring batch 的运行环境需要自己搭建, 而apache beam 这是1个SDK,它只定义 ETL pipeline的流程, 它需要额外的runner去执行

apache beam 暂时支持下面的runner
GCP dataflow
Apache Flink
Apache Spark
AWS data analysic
Java direct runner(调试用)

所以对于日志记录更多地去交给runner去实现。

所以讲, dataflow 只是1个runner , 核心还是apache beam SDK





什么是streaming

在计算领域中,“streaming”(流式处理)是一种数据处理模式,它允许实时处理连续流入的数据,而不是一次性处理静态数据集。
传统的批处理模式是将数据分成固定大小的块(batch),然后对每个批次进行处理。这种方式适用于静态数据集,但对于实时数据流,它可能无法满足实时性和低延迟的要求。
流式处理模式通过连续接收和处理数据流,实现了实时性和低延迟。数据流可以是连续的数据记录、事件流、传感器数据等。流式处理系统会持续地接收数据流,并立即对其进行处理和分析,以产生实时的结果。
流式处理通常具有以下特点:
连续性:数据流是连续不断的,没有明确的开始和结束。处理系统需要实时接收和处理数据流,而不是等待所有数据到达后再进行处理。
实时性:流式处理系统需要尽可能快地处理数据,并产生实时的结果。这对于需要实时决策、监控和反馈的应用程序非常重要。
有限状态:流式处理系统通常使用有限的内存和状态来处理数据流。它们需要在有限的资源下有效地处理无限的数据流。
流式处理可以应用于各种场景,如实时分析、实时监控、实时推荐、欺诈检测等。流式处理框架(如Apache Flink、Apache Kafka Streams、Apache Beam等)提供了方便的工具和API来开发和部署流式处理应用程序。





1个例子

 public void process() {log.info("processing3...");DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs("").as(DataflowPipelineOptions.class);options.setJobName("dataflow-exam3");options.setProject(this.projectId);options.setRegion("europe-west1");options.setTempLocation("gs://jason-hsbc-dataflow/tmp");options.setSubnetwork("regions/europe-west1/subnetworks/subnet-1");options.setNumWorkers(1);options.setStreaming(true);options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.INFO);options.setNumberOfWorkerHarnessThreads(2);//options.setGcpCredential(new File(...));options.setRunner(DataflowRunner.class);log.info(getCurrentAccountName());Pipeline pipeline = Pipeline.create(options);/*** * The effect of using the @UnknownKeyFor annotation is to tell the Apache Beam framework that the PCollection does not need to be grouped or associated based on a specific key.* As a result, Apache Beam can perform more efficient parallel computations and optimizations for operations that do not require key associations.** * When the @NonNull annotation is applied to a PCollection element type, it indicates that elements in the PCollection are not allowed to be null.* This means that when processing data streams, the Apache Beam framework checks elements for non-nullability and issues warnings or errors at compile time to avoid potential null pointer exceptions.the @Initialized annotation informs the compiler that the variable has been properly initialized by marking it at variable declaration time to help detect possible null pointer exceptionsand improve the reliability and readability of the code. However, it should be noted that the @Initialized annotation is only an auxiliary tool, and correct logic and design still need to be ensured in the actual programming process.*/PCollection<@UnknownKeyFor @NonNull @Initialized PubsubMessage> message = pipeline.apply("Read Pub/Sub Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription("projects/jason-hsbc/subscriptions/SubscriptionA1"));PCollection<KV<String, String>> combinedMsg = message.apply("Extract", ParDo.of(new ExtractMessageAttributeFn())).apply("appying windowing", Window.into(FixedWindows.of(Duration.standardMinutes(1)))).apply("Group by fileName", GroupByKey.create()).apply("Combine Message",ParDo.of(new CombinedMessagesFn()));combinedMsg.apply("Write to GCS", ParDo.of(new WriteToGCSFn(this.bucketName,this.projectId)));//yupipeline.run().waitUntilFinish();pipeline.run();log.info("processing3... end!");}

上面定义了1个dataflow pipeline, 它会从pubsub 里读取消息, 并把数据作为文件存储到GCS中

我们知道pubsub 是1个streaming 传输的工具, 如果这个job 执行一次就接受, 其实意义不大。

假如这个dataflow pipeling 定义的是1个batch的job而非streaming, 我们需要引入其他组件 才能持续监控pubsub的消息

例如
data sender -> pubsub topic -> pubusb trigger -> cloud functioin -> dataflow job

或者
datasender -> pubsub topic -> push 类型的subscription -> http 服务(cloud run/GKE) -> dataflow job

但是上面代码例子, 它有一行
options.setStreaming(true);
显式制定这个job 是streaming的

但是其实dataflow 本身也会根据一些规则去决定这个job是否为streaming(例如数据源是否为pubsub等)
但是用代码指定会更加安心.





waitUntilFinish() 的作用

代码的最后 有两个方法触发dataflow job方法
分别是 pipeline.run() 和 pipeline.run().waitUntilFinish()

前者会直接执行完成, 不会等待job的状态去执行下一行代码
而后者会等待job执行完成并返回状态结果。

但是, 如果是1个streaming的job的话, waitUntilFinish() 是无意义的, 因为streaming的job就是要长期运行(监控数据源)的啊


http://www.ppmy.cn/news/1140075.html

相关文章

docker compose 管理应用服务的常用命令

一 、docker compose 是什么 Docker Compose是一个用来管理多个关联容器的工具&#xff0c;可以根据配置文件自动构建、管理、编排一组容器。 Docker Compose语境下的“服务”是指一组容器共同构成的一个应用服务后端。 Docker Compose语境下的“项目”是由一个或多个应用服务…

vue3 keepalive跳转页面保存页面状态

描述 实现页面 A-> B &#xff0c; B->A&#xff08;A保存之前页面状态&#xff0c;不刷新页面&#xff09; // router/index.tsimport { createRouter, createWebHistory } from vue-router import HomeView from ../views/HomeView.vueconst router createRouter({h…

Git切换用户常用命令

1、查看 查看用户名 &#xff1a; git config user.name查看密码&#xff1a; git config user.password查看邮箱&#xff1a; git config user.email查看配置信息&#xff08;包含上面的信息&#xff09;&#xff1a; $ git config --list2、新增、切换 修改用户名 git…

LeetCode 面试题 08.06. 汉诺塔问题

文章目录 一、题目二、C# 题解 一、题目 在经典汉诺塔问题中&#xff0c;有 3 根柱子及 N 个不同大小的穿孔圆盘&#xff0c;盘子可以滑入任意一根柱子。一开始&#xff0c;所有盘子自上而下按升序依次套在第一根柱子上(即每一个盘子只能放在更大的盘子上面)。移动圆盘时受到以…

国内手机安装 Google Play 服务 (GMS/Google Mobile Services)

目录 1. 国内手机安装 Google Play 服务 (GMS/Google Mobile Services)1.1. 什么是 GMS1.2. 国内手机只需要安装 3 个 APP1.2.1. Google Services Framework 服务框架1.2.2. Google Play Services1.2.3. Google Play Store 应用商店 1.3. 问题1.3.1. 谷歌地图闪退 2. 小米手机 …

JavaScript入门——基础知识(3)

一、运算符 1.1 赋值运算符 目标&#xff1a;能够通过使用赋值运算符简化代码 赋值运算符&#xff1a;对变量进行赋值的运算符 将等号右边的值赋予给左边&#xff0c;要求左边必须是一个容器其他赋值运算符&#xff1a; -*/%使用这些运算符可以在对变量赋值时进行快速操作 例…

FFmpeg横竖版视频互换背景模糊一键生成

视频处理是现代多媒体应用中常见的需求。其中横竖版视频互换和背景模糊是视频编辑中常见的操作。FFmpeg是一个功能强大的工具,适用于这些任务。 本文将详细介绍如何使用FFmpeg进行横竖版视频互换和背景模糊。 文章目录 操作命令与命令说明横版转竖版竖版转横版背景模糊处理横…

第三课-软件升级-Stable Diffusion教程

前言: 虽然第二课已经安装好了 SD,但你可能在其它地方课程中,会发现很多人用的和你的界面差距很大。这篇文章会讲一些容易忽略或者常常需要做的操作,不一定要完全照做,以后再回过头看看也可以。 1.控制类型 问题:为什么别人有“控制类型”部分,而我没有?如下红色方框…