文章目录
一. 需求描述
flink消费http接口的数据,将json中的数组展开多行
如下样例数据以及要求处理的数据效果
{ "name": "John Doe", "age": 30, "address": { "street": { "street": "123 Main St", "city": "New York", "state": "NY" }, "city": "New York", "state": "NY" }, "phoneNumbers": [ { "type": "home", "number": "212-555-1234" }, { "type": "fax", "number": "646-555-4567" } ], "children": [], "spouse": null
}
name | age | street | city | state | phone_type | phone_number |
---|---|---|---|---|---|---|
John Doe | 30 | 123 Main St | New York | NY | home | 212-555-1234 |
John Doe | 30 | 123 Main St | New York | NY | fax | 646-555-4567 |
二. 方案思路
1. 解决思路
flink_json__53">2. flink json 解析
2.1. 通过json path解析非array数据
如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。
https://i-blog.csdnimg.cn/direct/bc303cf19f9743b58c3f38d1ec92091a.png" alt="在这里插入图片描述" />
这里使用 cast转换,如下举例
cast(JSON_VALUE(json_string,'$.id') as int) as id ,
JSON_VALUE(json_string,'$.name') as name,
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,
JSON_VALUE(json_string,'$.details.address') as address,
2.2. 通过json path解析array数据
官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。
https://i-blog.csdnimg.cn/direct/93761a2f38df474c9cd9374a389c174b.png" alt="在这里插入图片描述" />
如下:
<dependencies> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
通过udf解决
package com.dtstack.chunjun.local.test; import com.jayway.jsonpath.JsonPath;
import org.apache.flink.table.functions.ScalarFunction; import java.util.ArrayList;
import java.util.List; public class JsonArrayFieldExtractor extends ScalarFunction { public List<String> eval(String jsonString, String jsonPath) { if (jsonString == null || jsonString.isEmpty()) { return new ArrayList<String>(); } try { List<?> result = JsonPath.read(jsonString, jsonPath); List<String> stringList = new ArrayList<>(); for (Object obj : result) { stringList.add(obj.toString()); } return stringList; } catch (Exception e) { return new ArrayList<String>(); } } }
3. CROSS JOIN逻辑
Array Expansion
https://i-blog.csdnimg.cn/direct/f2505d29cc9240c89e7a1382bf018709.png" alt="在这里插入图片描述" />
注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。
三. 方案实现
http_json_141">1. http json数据样例
{ "id": 1, "name": "Alice", "details": { "age": {"real":11}, "address": "123Mainst", "contacts": [ { "type": "email", "value": "alice@example.com" }, { "type": "phone", "value": "123-456-7890" } ], "grade": [ { "grade": [{"zz":11},{"zz":11}], "bb": {"rr":{"yy":"alice@example.com"}} }, { "grade": [{"zz":22}], "bb": {"rr":{"yy":"alice@example.com"}} } ] }
}
flink_sql__175">2. flink sql 说明
CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';CREATE TABLE source
(json_string varchar
) WITH ('connector' = 'http-x','url' = 'http://localhost:8088/api/arraypage','intervalTime'= '3000','method'='get' --请求方式:get 、post,'decode'='text' -- 数据格式:只支持json模式-- 以下4个参数要同时存在:,'page-param-name'='pagenum' -- 多次请求参数1:分页参数名:例如:pageNum,'start-index'='1' -- 多次请求参数2:开始的位置,'end-index'='4' -- 多次请求参数3:结束的位置,'step'='1' -- 多次请求参数4:步长:默认值为1);CREATE TABLE sink
(id int,name varchar,`real` int,address varchar,zz int,yy varchar
) WITH ('connector' = 'print');insert into sink SELECTcast(JSON_VALUE(json_string,'$.id') as int) as id ,JSON_VALUE(json_string,'$.name') as name,cast(JSON_VALUE(json_string,'$.details.age.real') as int) as `real` ,JSON_VALUE(json_string,'$.details.address') as address,cast(`$.grade[*].grade[*].zz` as int ) as zz,`$.details.grade[*].bb.rr.yy` as yyFROM sourceCROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy' )) AS T1(`$.details.grade[*].bb.rr.yy`);--{
-- "id": 1,
-- "name": "Alice",
-- "details": {
-- "age": {"real":11},
-- "address": "123Mainst",
-- "contacts": [
-- {
-- "type": "email",
-- "value": "alice@example.com"
-- },
-- {
-- "type": "phone",
-- "value": "123-456-7890"
-- }
-- ],
-- "grade": [
-- {
-- "grade": [{"zz":11},{"zz":11}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- },
-- {
-- "grade": [{"zz":22}],
-- "bb": {"rr":{"yy":"alice@example.com"}}
-- }
-- ]
-- }
--}
消费结果
https://i-blog.csdnimg.cn/direct/b2a3622dc25647b4b94f50f7dc6a1244.png" alt="在这里插入图片描述" height="150" />
具体逻辑描述