【Flink实战】flink消费http数据并将数组展开多行

server/2024/9/22 16:54:15/
http://www.w3.org/2000/svg" style="display: none;">

文章目录

  • 一. 需求描述
  • 二. 方案思路
    • 1. 解决思路
    • 2. flink json 解析
      • 2.1. 通过json path解析非array数据
      • 2.2. 通过json path解析array数据
    • 3. CROSS JOIN逻辑
  • 三. 方案实现

一. 需求描述

flink消费http接口的数据,将json中的数组展开多行

如下样例数据以及要求处理的数据效果

{  "name": "John Doe",  "age": 30,  "address": {  "street": {  "street": "123 Main St",  "city": "New York",  "state": "NY"  },  "city": "New York",  "state": "NY"  },  "phoneNumbers": [  {  "type": "home",  "number": "212-555-1234"  },  {  "type": "fax",  "number": "646-555-4567"  }  ],  "children": [],  "spouse": null  
}
nameagestreetcitystatephone_typephone_number
John Doe30123 Main StNew YorkNYhome212-555-1234
John Doe30123 Main StNew YorkNYfax646-555-4567

二. 方案思路

1. 解决思路

  1. flink 消费http接口的数据(json),发送到下游
  2. 下游算子解析json数据,当遇到数组时,算子解析返回array
  3. 通过使用CROSS JOIN 将数组数据拍平,如上表格展现

flink_json__53">2. flink json 解析

2.1. 通过json path解析非array数据

如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。

https://i-blog.csdnimg.cn/direct/bc303cf19f9743b58c3f38d1ec92091a.png" alt="在这里插入图片描述" />

这里使用 cast转换,如下举例

cast(JSON_VALUE(json_string,'$.id') as int) as id ,  
JSON_VALUE(json_string,'$.name')  as name,  
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,  
JSON_VALUE(json_string,'$.details.address') as address,

 

2.2. 通过json path解析array数据

官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。

https://i-blog.csdnimg.cn/direct/93761a2f38df474c9cd9374a389c174b.png" alt="在这里插入图片描述" />

如下:

<dependencies>  <dependency>  <groupId>com.jayway.jsonpath</groupId>  <artifactId>json-path</artifactId>  <version>2.6.0</version>  </dependency>  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-common</artifactId>  <version>${flink.version}</version>  <scope>provided</scope>  </dependency>

通过udf解决

package com.dtstack.chunjun.local.test;  import com.jayway.jsonpath.JsonPath;  
import org.apache.flink.table.functions.ScalarFunction;  import java.util.ArrayList;  
import java.util.List;  public class JsonArrayFieldExtractor extends ScalarFunction {  public List<String> eval(String jsonString, String jsonPath) {  if (jsonString == null || jsonString.isEmpty()) {  return new ArrayList<String>();  }  try {  List<?> result = JsonPath.read(jsonString, jsonPath);   List<String> stringList = new ArrayList<>();  for (Object obj : result) {  stringList.add(obj.toString());  }  return stringList;  } catch (Exception e) {  return new ArrayList<String>();  }  }  }

3. CROSS JOIN逻辑

Array Expansion

https://i-blog.csdnimg.cn/direct/f2505d29cc9240c89e7a1382bf018709.png" alt="在这里插入图片描述" />

注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。

 

三. 方案实现

http_json_141">1. http json数据样例

{  "id": 1,  "name": "Alice",  "details": {  "age": {"real":11},  "address": "123Mainst",  "contacts": [  {  "type": "email",  "value": "alice@example.com"  },  {  "type": "phone",  "value": "123-456-7890"  }  ],  "grade": [  {  "grade": [{"zz":11},{"zz":11}],  "bb": {"rr":{"yy":"alice@example.com"}}  },  {  "grade": [{"zz":22}],  "bb": {"rr":{"yy":"alice@example.com"}}  }  ]  }  
}

 

flink_sql__175">2. flink sql 说明

CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';CREATE TABLE source
(json_string varchar
) WITH ('connector' = 'http-x','url' = 'http://localhost:8088/api/arraypage','intervalTime'= '3000','method'='get'                              --请求方式:get 、post,'decode'='text'                             -- 数据格式:只支持json模式-- 以下4个参数要同时存在:,'page-param-name'='pagenum'                          -- 多次请求参数1:分页参数名:例如:pageNum,'start-index'='1'                             -- 多次请求参数2:开始的位置,'end-index'='4'                               -- 多次请求参数3:结束的位置,'step'='1'                                  -- 多次请求参数4:步长:默认值为1);CREATE TABLE sink
(id               int,name             varchar,`real`               int,address                varchar,zz                int,yy                varchar
) WITH ('connector' = 'print');insert into sink   SELECTcast(JSON_VALUE(json_string,'$.id') as int) as id ,JSON_VALUE(json_string,'$.name')  as name,cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,JSON_VALUE(json_string,'$.details.address') as address,cast(`$.grade[*].grade[*].zz` as int ) as zz,`$.details.grade[*].bb.rr.yy` as yyFROM sourceCROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy'   )) AS T1(`$.details.grade[*].bb.rr.yy`);--{
--  "id": 1,
--  "name": "Alice",
--  "details": {
--    "age": {"real":11},
--    "address": "123Mainst",
--    "contacts": [
--      {
--        "type": "email",
--        "value": "alice@example.com"
--      },
--      {
--        "type": "phone",
--        "value": "123-456-7890"
--      }
--    ],
--    "grade": [
--      {
--        "grade": [{"zz":11},{"zz":11}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      },
--      {
--        "grade": [{"zz":22}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      }
--    ]
--  }
--}

消费结果
https://i-blog.csdnimg.cn/direct/b2a3622dc25647b4b94f50f7dc6a1244.png" alt="在这里插入图片描述" height="150" />

具体逻辑描述

  1. http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
    [feature-DTStack#1775][connector][http] http supports offline mode

  2. 使用JSON_VALUE、get_json_array解析为string和array<string>,之后使用cast进行类型转换

  3. CROSS JOIN 生成笛卡尔积


http://www.ppmy.cn/server/120364.html

相关文章

sqlgun靶场通关攻略

尝试在URL中测试一下sql注入&#xff0c;但是禁止注入 那么在输入框中测试一下xss,存在xss&#xff0c;那么极有可能如果存在sql注入在这里 在测试时&#xff0c;出现了这个搜索框的源代码&#xff0c;可以看出是单引号包含&#xff0c;所以可以测试闭合方式为单引号的sql注入 …

笔记:DrawingContext和GDI+对比简介

一、目的&#xff1a;分享一个wpf中级控件&#xff0c;鼠标放上展开其他控件的效果 DrawingContext 和 GDI 的 Graphics 类都是用于绘图的技术&#xff0c;但它们属于不同的图形库和框架&#xff0c;适用于不同的场景。让我们详细比较一下这两者。 二、对比 DrawingContext Dra…

专题·大模型安全 | 生成式人工智能的内容安全风险与应对策略

正如一枚硬币的两面&#xff0c;生成式人工智能大模型&#xff08;以下简称“生成式大模型”&#xff09;在助力内容生成的同时也潜藏风险&#xff0c;成为虚假信息传播、数据隐私泄露等问题的温床&#xff0c;加剧了认知域风险。与传统人工智能&#xff08;AI&#xff09;相比…

【算法题】63. 不同路径 II-力扣(LeetCode)-”如果起点有障碍物,那么便到不了终点“

【算法题】63. 不同路径 II-力扣(LeetCode)-”如果起点有障碍物&#xff0c;那么便到不了终点“ 1.题目 下方是力扣官方题目的地址 63. 不同路径 II 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下…

Elasticsearch 检索优化:停用词的应用

Elasticsearch 检索优化&#xff1a;停用词的应用 场景描述 目前在 Elasticsearch 集群中存储约 1.5 亿篇文章数据&#xff0c;随着数据量的增加&#xff0c;检索性能问题逐渐显现。在列表检索和聚合操作中&#xff0c;CPU 消耗飙升至 100%&#xff0c;并且检索耗时较长&…

SSM+vue音乐播放器管理系统

音乐播放器管理系统 随着社会的发展&#xff0c;计算机的优势和普及使得音乐播放器管理系统的开发成为必需。音乐播放器管理系统主要是借助计算机&#xff0c;通过对首页、音乐推荐、付费音乐、论坛信息、个人中心、后台管理等信息进行管理。减少管理员的工作&#xff0c;同时…

LangChain教程 - 支持的向量数据库列举

系列文章索引 LangChain教程 - 系列文章 向量数据库是现代自然语言处理应用中不可或缺的组件&#xff0c;它们可以高效地存储、索引和检索嵌入向量&#xff0c;支持大规模的相似性搜索任务。LangChain 是一个强大的框架&#xff0c;允许开发者轻松将大型语言模型与向量数据库结…

【吊打面试官系列-MySQL面试题】MyISAM 表格将在哪里存储,并且还提供其存储格式?

大家好&#xff0c;我是锋哥。今天分享关于【MyISAM 表格将在哪里存储&#xff0c;并且还提供其存储格式&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; MyISAM 表格将在哪里存储&#xff0c;并且还提供其存储格式&#xff1f; 每个 MyISAM 表格以三种格式存储…