[实时计算flink]数据摄入YAML作业快速入门

embedded/2024/10/24 4:49:35/

实时计算Flink版基于Flink CDC,通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文介绍如何快速构建一个YAML作业将MySQL库中的所有数据同步到StarRocks中。

前提条件

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

  • 上下游存储

    • 已创建RDS MySQL实例,详情请参见快速创建RDS MySQL实例。

    • 已创建StarRocks实例,详情请参见步骤一:创建存算一体版StarRocks实例。

    说明

    RDS MySQL和StarRocks需要与Flink工作空间在相同VPC下,否则需要打通网络和配置RDS MySQL的IP白名单,详情请参见如何访问跨VPC的其他服务?、实时计算Flink版如何访问公网?和操作指导。

背景信息

假设MySQL实例中有一个order_dw_mysql库,里面有名称为orders、orders_pay和product_catalog的3张业务表。此时,如果您希望开发一个数据摄入YAML作业,将这些表和数据都同步到StarRocks的order_dw_sr数据库中,则可以按照以下步骤进行:

  1. 步骤一:准备RDS MySQL测试数据

  2. 步骤二:开发数据摄入YAML作业

  3. 步骤三:启动数据摄入YAML作业

  4. 步骤四:在StarRocks上查看同步结果

步骤一:准备RDS MySQL测试数据

  1. 创建数据库和账号。

    为目标实例创建名称为order_dw_mysql数据库和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号和管理数据库。

  2. 通过DMS登录RDS MySQL。

    详情请参见通过DMS登录RDS MySQL。

  3. 在已登录的SQL Console窗口,输入如下命令后单击执行,创建数据库和三张业务表,并插入数据。

    CREATE TABLE `orders` (order_id bigint not null primary key,user_id varchar(50) not null,shop_id bigint not null,product_id bigint not null,buy_fee numeric(20,2) not null,   create_time timestamp not null,update_time timestamp not null default now(),state int not null 
    );CREATE TABLE `orders_pay` (pay_id bigint not null primary key,order_id bigint not null,pay_platform int not null, create_time timestamp not null
    );CREATE TABLE `product_catalog` (product_id bigint not null primary key,catalog_name varchar(50) not null
    );-- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

步骤二:开发数据摄入YAML作业

  1. 登录实时计算管理控制台。

  2. 在左侧导航栏选择数据开发 > 数据摄入

  3. 单击新建,选择MySQL到Starrocks数据同步,单击下一步

  4. 填写作业名称存储位置和选择引擎版本后,单击确定

  5. 配置YAML作业代码信息。

    将MySQL中order_dw_mysql数据库下的所有表同步到starrocks的order_dw_sr数据库中,代码示例如下。

    source:type: mysqlhostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comport: 3306username: ${secret_values.mysqlusername}password: ${secret_values.mysqlpassword}tables: order_dw_mysql.\.*server-id: 5405-5415sink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030username: ${secret_values.starrocksusername}password: ${secret_values.starrockspassword}table.create.properties.replication_num: 1route:- source-table: order_dw_mysql.\.*sink-table: order_dw_sr.<>replace-symbol: <>description: route all tables in source_db to sink_dbpipeline:name: Sync MySQL Database to StarRocks

    关于MySQL和Starrocks的本示例需要的配置信息说明如下表所示,数据摄入更多参数详情请参见MySQL和StarRocks。

    类别

    参数

    说明

    示例值

    source

    hostname

    MySQL数据库的IP地址或者Hostname。

    建议填写专有网络VPC地址。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL数据库服务的端口号。

    3306

    username

    MySQL数据库服务的用户名和密码。填写您步骤一:准备RDS MySQL测试数据中创建的账号和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL表名。支持正则表达式以读取多个表的数据。

    本文将同步order_dw_mysql数据库所有表及数据。

    order_dw_mysql.\.*

    server-id

    数据库客户端的一个数字ID。

    5405-5415

    sink

    jdbc-url

    JDBC连接的URL。

    指定FE(Front End)的IP和查询端口,格式为jdbc:mysql://ip:port

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址查询端口

     jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    连接到FE节点的HTTP服务URL。

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址HTTP端口

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks连接用户名和密码。

    此处需要填写为您开通StarRocks时填写的用户名和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    指定生效上游表。

    order_dw_mysql.\.*

    sink-table

    指定数据路由的目标位置。

    order_dw_sr.<>

    replace-symbol

    在使用模式匹配功能时,用于指代上游表名的字符串。

    <>

  6. 单击部署

步骤三:启动数据摄入YAML作业

  1. 数据摄入页面,单击部署后,在弹出的对话框中,单击确定

  2. 运维中心 > 作业运维页面,单击目标YAML作业操作中的启动

  3. 单击启动

    本示例选择为无状态启动,参数配置详情请参见作业启动。作业启动后,您可以在作业运维页面观察作业的运行信息和状态。

步骤四:在StarRocks上查看同步结果

当YAML作业处于运行中后,您就可以在StarRocks查看数据同步情况。

  1. 通过EMR StarRocks Manager连接StarRocks实例。

  2. 在左侧导航栏,单击SQL Editor,在数据库页签,单击

    image

    按钮。

    您会看到default_catalog下出现名称为order_dw_sr的数据库。

  3. 查询列表页签,单击+文件,新建查询脚本后,输入以下SQL语句,单击运行

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 在命令下方查看同步结果。

    您会看到StarRocks中已存在和MySQL数据库中相同名称的表及数据。

    image


http://www.ppmy.cn/embedded/129986.html

相关文章

Python酷库之旅-第三方库Pandas(140)

目录 一、用法精讲 631、pandas.Timestamp类 631-1、语法 631-2、参数 631-3、功能 631-4、返回值 631-5、说明 631-6、用法 631-6-1、数据准备 631-6-2、代码示例 631-6-3、结果输出 632、pandas.Timestamp.asm8属性 632-1、语法 632-2、参数 632-3、功能 632…

Redisson使用全解

redisson使用全解——redisson官方文档注释&#xff08;上篇&#xff09;_redisson官网中文-CSDN博客 redisson使用全解——redisson官方文档注释&#xff08;中篇&#xff09;-CSDN博客 redisson使用全解——redisson官方文档注释&#xff08;下篇&#xff09;_redisson官网…

[Python学习日记-50] Python 中的序列化模块 —— pickle 和 json

[Python学习日记-50] Python 中的序列化模块 —— pickle 和 json 简介 pickle 模块 json 模块 pickle VS json 简介 什么叫序列化&#xff1f; 序列化指的是将对象转换为可以在网络上传输或者存储到文件系统中的字节流的过程。序列化使得对象可以被保存、传输和恢复&#…

C++卓越:全面提升专业技能的深度课程(第一章第一课C++17与C++20概述)

第一章&#xff1a;C的现代化 第一课&#xff1a;C17与C20概述 引言 C是一种强大的编程语言&#xff0c;具有丰富的特性和广泛的应用。随着C17和C20的发布&#xff0c;这些版本引入了大量新特性&#xff0c;进一步增强了语言的灵活性和效率。本课将全面探讨C17与C20的新特性…

几何算法系列:空间实体体积计算公式推导

1.前言 面积和体积的计算是常见和基础的几何算法话题&#xff0c;面积和体积通常作为面或构件的基本信息参与相关的建模、计算、分析等过程。 有关面积的计算&#xff0c;可以参考博主此前的文章&#xff0c; 一种误差较小的轮廓面积计算算法_轮廓面积计算原理-CSDN博客文章…

[前端] ✨【如何用课程设计提升工程能力?】✨笔记

✨【如何用课程设计提升工程能力&#xff1f;】✨ &#x1f4da; 课程设计真的在语言工具类课程中占据了“C位”&#xff01;&#x1f451;设计得好的课程简直像一个实战训练营&#xff0c;既能帮助学生巩固理论&#xff0c;又能培养解决复杂问题的能力&#xff0c;还能让他们…

麒麟aarch64架构下安装compat-openssl10

问题描述&#xff1a; 麒麟aarch64架构下安装mysql8.0.40,报错nothing provides libcrypto.so.10()(64bit) needed by 原因&#xff1a; 你当前系统的 OpenSSL 版本与 MySQL 8.0.40 所需的库不匹配。MySQL 8.0.40 需要 libcrypto.so.10&#xff0c;而你的系统使用的是 OpenS…

张雪峰:如果你现在是计算机专业,一定要优先报网络安全,它是未来国家发展的大方向

&#x1f91f; 基于入门网络安全/黑客打造的&#xff1a;&#x1f449;黑客&网络安全入门&进阶学习资源包 前言 “计算机专业 一定要优先报 网络安全 它是未来国家发展的大方向” 为什么推荐学网络安全&#xff1f; “没有网络安全就没有国家安全。”当前&#xff…