[实时计算flink]日志实时入仓快速入门

server/2024/10/18 11:43:37/

Flink全托管产品提供丰富强大的日志数据实时入仓能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从Kafka到Hologres的数据同步作业。

背景信息

假设消息队列Kafka实例中有一个名称为users的Topic,其中有100条JSON数据,代表通过日志文件采集工具或者应用写入Kafka的日志数据,其数据分布大致如下图所示。

数据分布

此时,如果您希望创建一个数据同步的作业,将该Topic中的日志数据都同步到Hologres中,则可以按照以下步骤进行:

  • 步骤一:配置IP白名单

  • 步骤二:准备Kafka测试数据

  • 步骤三:创建Hologres Catalog

  • 步骤四:创建并启动数据同步作业

  • 步骤五:观察全量同步结果

  • 步骤六:观察自动同步表结构变更

  • (可选)步骤七:调整作业资源配置

本文使用Flink全托管提供的CREATE TABLE AS(CTAS)语句,一键完成日志数据的同步,以及实时的表结构变更同步。

前提条件

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

  • 上下游存储

    • 已创建消息队列Kafka实例,详情请参见步骤三:创建资源。

    • 已创建Hologres实例,详情请参见购买Hologres。

    说明

    消息队列Kafka和Hologres需要与Flink全托管工作空间在相同地域相同VPC下,否则需要打通网络,详情请参见控制台操作或控制台操作。

步骤一:配置IP白名单

为了让Flink能访问Kafka和Hologres实例,您需要将Flink全托管工作空间的网段添加到在Kafka和Hologres的白名单中。

  1. 获取Flink全托管工作空间的VPC网段。

    1. 登录实时计算控制台。

    2. 在目标工作空间右侧操作列,选择更多 > 工作空间详情

    3. 工作空间详情对话框,查看Flink全托管虚拟交换机的网段信息。

      网段信息

  2. 在消息队列Kafka的IP白名单中,添加Flink全托管网段信息。

    操作步骤详情请参见配置白名单。

    Kafka白名单

  3. 在Hologres的IP白名单中,添加Flink全托管网段信息。

    操作步骤详情请参见IP白名单。

    Holo白名单

步骤二:准备Kafka测试数据

使用Flink全托管的模拟数据生成源表作为数据生成器,将数据写入到Kafka中。请按以下步骤使用Flink全托管开发控制台将数据写入至消息队列Kafka。

  1. 在Kafka控制台创建一个名称为users的Topic。

    操作详情请参见步骤一:创建Topic。

  2. 创建将数据写入到Kafka的作业。

    1. 登录实时计算管理控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL,单击新建

    4. 新建作业草稿对话框,选择目标模板(例如:选择空白的流作业草稿),完成后单击下一步,填写作业配置信息。

      作业参数

      示例

      说明

      文件名称

      kafka-data-input

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      存储位置

      作业草稿

      指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。

      您还可以在现有文件夹右侧,单击

      新建文件夹

      图标,新建子文件夹。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉列表中选择目标引擎版本。

    5. 单击创建

    6. 将以下作业代码拷贝到作业文本编辑区。

      CREATE TEMPORARY TABLE source (id INT,first_name STRING,last_name STRING,`address` ROW<`country` STRING, `state` STRING, `city` STRING>,event_time TIMESTAMP
      ) WITH ('connector' = 'faker','number-of-rows' = '100','rows-per-second' = '10','fields.id.expression' = '#{number.numberBetween ''0'',''1000''}','fields.first_name.expression' = '#{name.firstName}','fields.last_name.expression' = '#{name.lastName}','fields.address.country.expression' = '#{Address.country}','fields.address.state.expression' = '#{Address.state}','fields.address.city.expression' = '#{Address.city}','fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );CREATE TEMPORARY TABLE sink (id INT,first_name STRING,last_name STRING,`address` ROW<`country` STRING, `state` STRING, `city` STRING>,`timestamp` TIMESTAMP METADATA
      ) WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000','topic' = 'users','format' = 'json'
      );INSERT INTO sink SELECT * FROM source;
    7. 请按您的实际配置,修改以下参数配置信息。

      参数

      示例值

      说明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      topic

      users

      Kafka Topic名称。

  3. 启动作业。

    1. 数据开发 > ETL页面,单击部署

    2. 部署新版本对话框中,单击确定

    3. 配置作业资源,资源设置填写详情请参见配置作业资源。

    4. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动。

    5. 您可以在作业运维页面观察作业的运行信息和状态。

      image

      由于faker数据源是一个有限流,因此在作业处于运行状态后,大约1分钟左右后,作业就会处于完成状态。当作业结束运行代表作业已经将相关的数据写入到Kafka的users中。其中,写入到消息队列Kafka的JSON数据格式大致如下。

      {"id": 765,"first_name": "Barry","last_name": "Pollich","address": {"country": "United Arab Emirates","state": "Nevada","city": "Powlowskifurt"}
      }

步骤三:创建Hologres Catalog

单表同步都需要依赖目标Catalog来创建目标表。因此,您需要通过控制台创建目标Catalog。本文将以目标Catalog为Hologres Catalog为例,为您进行介绍。

  1. 创建名称为holo的Hologres Catalog。

    操作步骤详情请参见创建Hologres Catalog。

    holo catalog

    重要

    您需要在您的目标实例中已创建flink_test_db数据库,否则创建Catalog会报错。

  2. Schemas页签,确认已创建名为holo的Catalog。

    刷新按钮

步骤四:创建并启动数据同步作业

  1. 登录Flink全托管开发控制台,创建数据同步作业。

    1. 登录实时计算管理控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL,单击新建

    4. 新建作业草稿对话框,选择目标模板(例如:选择空白的流作业草稿),完成后单击下一步,填写作业配置信息。

      作业参数

      示例

      说明

      文件名称

      flink-quickstart-test

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      存储位置

      作业草稿

      指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。

      您还可以在现有文件夹右侧,单击

      新建文件夹

      图标,新建子文件夹。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉列表中选择目标引擎版本。

    5. 单击创建

  2. 将以下作业代码拷贝到作业文本编辑区。

    将消息队列Kafka中名称为users的Topic数据同步至Hologres的flink_test_db数据库的sync_kafka_users表中。您可以通过以下任意一种方式进行:

    • 通过CATS语句同步

      该方式无需您手动在Hologres中创建该表,也无需指明对应的列类型为JSON或JSONB。

      CREATE TEMPORARY TABLE kafka_users (`id` INT NOT NULL,`address` STRING,`offset` BIGINT NOT NULL METADATA,`partition` BIGINT NOT NULL METADATA,`timestamp` TIMESTAMP METADATA,`date` AS CAST(`timestamp` AS DATE),`country` AS JSON_VALUE(`address`, '$.country'),PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
      ) WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000','topic' = 'users','format' = 'json','json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。'scan.startup.mode' = 'earliest-offset'
      );CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
      WITH ('connector' = 'hologres'
      ) AS TABLE kafka_users;

      说明

      为了避免作业Failover后,作业重启将重复数据写入到Hologres中,您可以添加相关主键从而唯一地标识数据。当数据重发时,Hologres将会保证相同partition和offset的数据只会保留一份。

    • 通过INSERT INTO语句同步

      考虑到Hologres中对于JSON和JSONB类型的数据会进行特殊的优化,您也可以通过INSERT INTO语句将嵌套JSON写入到Hologres中。

      该方式需要您手动在Hologres中创建该表并指明需要对应的列类型为JSON或JSONB,然后通过下文的SQL,会将address数据写入到 Hologres中类型为JSON的列。

      CREATE TEMPORARY TABLE kafka_users (`id` INT NOT NULL,`address` STRING, -- 该列对应的数据为嵌套JSON。`offset` BIGINT NOT NULL METADATA,`partition` BIGINT NOT NULL METADATA,`timestamp` TIMESTAMP METADATA,`date` AS CAST(`timestamp` AS DATE),`country` AS JSON_VALUE(`address`, '$.country')
      ) WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000','topic' = 'users','format' = 'json','json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。'scan.startup.mode' = 'earliest-offset'
      );CREATE TEMPORARY TABLE holo (`id` INT NOT NULL,`address` STRING,`offset` BIGINT,`partition` BIGINT,`timestamp` TIMESTAMP,`date` DATE,`country` STRING
      ) WITH ('connector' = 'hologres','endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80','username' = 'LTAI5tE572UJ44Xwhx6i****','password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****','dbname' = 'flink_test_db','tablename' = 'sync_kafka_users'
      );INSERT INTO holo
      SELECT * FROM kafka_users;
  3. 请按您的实际配置,修改以下参数配置信息。

    参数

    示例值

    说明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

    Kafka Broker地址。

    格式为host:port,host:port,host:port,以英文逗号(,)分割。

    topic

    users

    Kafka Topic名称。

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Hologres端点。

    格式为<ip>:<port>。

    username

    LTAI5tE572UJ44Xwhx6i****

    Hologres用户名,请填写阿里云账号的AccessKey ID。

    password

    KtyIXK3HIDKA9VzKX4tpct9xTm****

    Hologres密码,请填写阿里云账号的AccessKey Secret。

    dbname

    flink_test_db

    Hologres数据库名称。

    tablename

    sync_kafka_users

    Hologres表名称。

    说明

    • 如果您通过INSERT INTO方式同步数据,则需要提前在目标实例的数据库中创建sync_kafka_users表和字段。

    • 如果Schema不为Public时,则tablename需要填写为schema.tableName。

  4. 单击保存

  5. 数据开发 > ETL页面,单击部署

  6. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动。

  7. 单击启动

    作业启动后,您可以在作业运维界面观察作业的运行信息和状态。

    image

步骤五:观察全量同步结果

  1. 登录Hologres管理控制台。

  2. 实例列表页面,单击目标实例名称。

  3. 在页面右上角,单击登录实例

  4. 元数据管理页签,查看users数据库中同步的sync_kafka_users表结构和数据。

    sync_kafka_users表

    同步后的表结构和数据如下图所示。

    • 表结构

      双击sync_kafka_users表名称,查看表结构。

      表结构

      说明

      在同步过程中,建议声明Kafka的Metadata partition和offset作为Hologres表中的主键。这样可以避免由于作业Failover,数据重发导致下游存储多份相同数据。

    • 表数据

      在sync_kafka_users表信息页面右上角,单击查询表后,输入如下命令,单击运行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      表数据结果如下图所示。

      表数据

步骤六:观察自动同步表结构变更

  1. 在Kafka控制台手动发送一条包含新增列的消息。

    1. 登录云消息队列 Kafka 版控制台。

    2. 实例列表页面,单击目标实例名称。

    3. Topic管理页面,单击目标Topic名称users。

    4. 单击体验发送消息

    5. 填写消息内容。

      消息内容

      配置项

      示例

      发送方式

      选中控制台

      消息Key

      填写为flinktest。

      消息内容

      将以下JSON内容复制粘贴到消息内容中。

      {"id": 100001,"first_name": "Dennise","last_name": "Schuppe","address": {"country": "Isle of Man","state": "Montana","city": "East Coleburgh"},"house-points": {"house": "Pukwudgie","points": 76}
      }

      说明

      该示例中house-points是一个新增的嵌套列。

      发送到指定分区

      选中

      分区ID

      填写为0。

    6. 单击确定

  2. 在Hologres控制台,查看sync_kafka_users表结构和数据的变化。

    1. 登录Hologres管理控制台。

    2. 实例列表页面,单击目标实例名称。

    3. 在页面右上角,单击登录实例

    4. 元数据管理页签,双击sync_kafka_users表名称。

    5. 单击查询表后,输入如下命令,单击运行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. 查看表数据结果。

      表数据结果如下图所示。

      Hologres表结果

      可以观察到id为100001的数据已经成功地写入到了Hologres中。同时,Hologres中多了house-points.house和house-points.points 两列。

      说明

      虽然插入到Kafka中的数据仅只有一个嵌套列house-points,但是由于在kafka_users表的WITH参数内声明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。

(可选)步骤七:调整作业资源配置

根据数据量的不同,我们往往需要调节不同节点的并发和资源,以达到更优的作业性能。您可以使用资源配置的基础模式简单配置作业并发度和CU数,也可以使用资源配置的专家模式细粒度地调整节点的并发和资源。

  1. 登录Flink全托管开发控制台,进入作业详情页面。

    1. 登录实时计算管理控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击运维中心 > 作业运维

  2. 修改资源配置。

    1. 部署详情页面,单击资源配置右侧的编辑资源模式选择为专家模式

    2. 在配置计划中单击立即获取

    3. 单击展开全部

      观察完整的拓扑图,通过完整的拓扑图能了解到整个数据的同步计划,即具体同步哪些表。

    4. 手动设置每个节点的并发。

      由于Kafka users Topic有四个分区,因此可以设置作业为4并发。由于日志数据只是写入到Hologres一张表中,为了降低Hologres的连接数,可以调节Hologres的并发为2。资源配置步骤详情请参见配置作业部署信息。经过调节后的作业资源配置计划如下图所示。

      作业配置计划

    5. 单击确定

    6. 填写基础配置后,单击启动。关于作业启动的配置说明,请参见作业启动。

  3. 运维中心 > 作业运维页面,单击目标作业名称。

  4. 状态总览页面,查看调整效果。

    image.png


http://www.ppmy.cn/server/132751.html

相关文章

艾体宝干货丨网络安全指南:如何使用Profishark和IOTA检测Blast-RADIUS

随着网络安全威胁的不断增加&#xff0c;了解并预防可能的攻击变得至关重要。Blast-RADIUS 是一种严重影响 RADIUS 协议的安全漏洞&#xff0c;能够让攻击者绕过身份验证获取未经授权的访问权限。本篇文章将深入探讨该漏洞的工作原理、检测方法及应对措施&#xff0c;帮助您有效…

SQL Server LocalDB 表数据中文乱码问题

--查看数据库设置 SELECT name, collation_name FROM sys.databases;--出现了The database could not be exclusively locked to perform the operation这个错误&#xff0c; --无法修改字符集为Chinese_PRC_CI_AS&#xff1b;所以需要先设置为单用户模式 ALTER DATABASE MySma…

环境变量(Linux)

文章目录 一、什么是环境变量&#xff1f;二、环境变量的作用1. 方便命令执行&#xff1a;2.配置系统和应用程序&#xff1a;3.用户自定义环境变量&#xff1a; 三、Linux 常见环境变量四、设置环境变量1.临时设置&#xff1a;2.永久设置&#xff1a; 五、环境变量的优先级六、…

Excel:Cells(Rows.Count, 1).End(xlUp).Row和Cells(Rows.Count, 1).End(xlUp)有什么区别

Cells(Rows.Count, 1).End(xlUp).Row 和 Cells(Rows.Count, 1).End(xlUp) 是 VBA 中用于定位 Excel 工作表中单元格的两种不同用法。以下是它们的区别&#xff1a; 1. Cells(Rows.Count, 1).End(xlUp).Row 功能: 这个表达式返回的是一个行号&#xff08;Long 类型&#xff09…

隧道代理IP如何帮助企业采集数据?

在数字化时代&#xff0c;数据已成为企业决策的重要基石。无论是市场调研、竞品分析&#xff0c;还是用户行为研究&#xff0c;高质量的数据采集都是企业成功的关键。然而&#xff0c;面对复杂的网络环境和日益严格的反爬虫机制&#xff0c;如何高效、稳定地采集数据成为了一个…

Github优质项目推荐 - 第六期

文章目录 Github优质项目推荐 - 第六期一、【WiFiAnalyzer】&#xff0c;3.4k stars - WiFi 网络分析工具二、【penpot】&#xff0c;33k stars - UI 设计与原型制作平台三、【Inpaint-Anything】&#xff0c;6.4k stars - 修复图像、视频和3D 场景中的任何内容四、【Malware-P…

鸿蒙开发:arkTS 行与列用法

随着万物互联时代的到来&#xff0c;华为提出了“一次开发多端部署、可分可合自由流转、统一生态原生智能”三大应用与服务开发理念。为了降低开发门槛&#xff0c;协助开发者更好地应对多设备、多入口、服务可分可合等特性&#xff0c;华为基于JS/TS语言体系&#xff0c;构建了…

解锁二叉树的魅力:链式实现详解

前言 二叉树的简介及顺序实现 引言 在数据结构的浩瀚星空中&#xff0c;二叉树如同一颗璀璨的明珠&#xff0c;其优雅的结构和强大的功能使其成为计算机科学中不可或缺的工具。从数据库索引到编译器的语法树&#xff0c;二叉树以其独特的方式支撑着许多核心算法与数据处理。…