flink + Atlas 任务数据血缘调通

news/2025/4/1 1:06:50/

据此修改 Flink 源码

版本
Flink1.13.5
Atlas1.2.0

将 atlas 配置文件打进 flink-bridge;atlas 相关的 jar 放进 flink/lib

jar uf flink-bridge-1.2.0.jar atlas-application.properties 

在这里插入图片描述
flink-conf.yaml 注册监听
在这里插入图片描述
org.apache.flink.configuration.ExecutionOptions 添加配置属性

public static final ConfigOption<List<String>> JOB_LISTENERS =ConfigOptions.key("execution.atlas.job-listeners").stringType().asList().noDefaultValue().withDescription("JobListenerFactories to be registered for the execution.");

一点说明:官方Flink1.12.0 版本之后支持配置execution.job-listeners,因此自己添加了个配置属性execution.atlas.job-listeners 进行区分,
org.apache.flink.configuration.DeploymentOptions
在这里插入图片描述

任务提交
flink run -m yarn-cluster -ys 1 -yjm 1024 -ytm 1024 -c com.nufront.bigdata.v2x.test.AtlasTest /opt/v2x-1.0-SNAPSHOT.jar
测试任务
public class AtlasTest {public static void main(String[] args) throws Exception {//TODO 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.disableOperatorChaining();// TODO kafka消费// 配置 kafka 输入流信息Properties consumerprops = new Properties();consumerprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.67:9092");consumerprops.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");consumerprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");consumerprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");consumerprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");consumerprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 添加 kafka 数据源DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<>("atlas-source-topic", new SimpleStringSchema(), consumerprops));// 配置kafka输入流信息Properties producerprops = new Properties();producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.67:9092");// 配置证书信息dataStreamSource.addSink(new FlinkKafkaProducer<String>("atlas-sink-topic", new KeyedSerializationSchemaWrapper(new SerializationSchema<String>(){@Overridepublic byte[] serialize(String element) {return element.getBytes();}}), producerprops));env.execute("AtlasTest");}}
flink on yarn 日志输出

在这里插入图片描述

修改 json 解析方式

org.apache.atlas.utils.AtlasJson#toJson

public static String toJson(Object obj) {String ret;if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {ret = ((JsonNode) obj).textValue();} else {// 修改 json 处理方式:fastjson,原来的ObjectMapper.writeValueAsString() 一度卡住不往下执行// ret = mapper.writeValueAsString(obj);ret = JSONObject.toJSONString(JSONObject.toJSON(obj));LOG.info(ret);}return ret;}
查看目标 kafka 对应topic

在这里插入图片描述

在这里插入图片描述


http://www.ppmy.cn/news/385096.html

相关文章

维吉尼亚密码

维吉尼亚密码&#xff08;又译维热纳尔密码&#xff09;是使用一系列凯撒密码组成密码字母表的加密算法&#xff0c;属于多表密码的一种简单形式。 【加密原理】 明文&#xff1a;I Love You 密钥&#xff1a;OK 首先&#xff0c;密钥长度需要与明文长度相同&#xff0c;如果少…

艾美捷曲妥珠单抗Trastuzumab化学性质和特异性说明

艾美捷曲妥珠单抗Trastuzumab是人源化IgG1κ单克隆抗体&#xff0c;其以高亲和力选择性结合人表皮生长因子受体2蛋白HER2的细胞外结构域。曲妥珠单抗通过重组DNA技术在哺乳动物细胞&#xff08;中国仓鼠卵巢&#xff09;中产生。 艾美捷曲妥珠单抗Trastuzumab化学性质&#xff…

关注渐冻症|菌群助力探索其发病机理及相关干预措施

最杰出的物理学家之一的斯蒂芬威廉霍金想必大家都知道&#xff0c;以及曾经风靡全网的“冰桶挑战”&#xff0c;它们都与一种罕见疾病有关&#xff0c;那就是渐冻症。 媒体的宣传让渐冻症成为了较为“知名”罕见病之一&#xff1b;2000年丹麦举行的国际病友大会上正式确定6月21…

活性氧AMK(hydrochloride),cas:1215711-91-3,来那替尼/菊苣酸/毛蕊异黄酮苷

活性氧AMK(hydrochloride)&#xff0c;cas:1215711-91-3&#xff0c;来那替尼/菊苣酸/毛蕊异黄酮苷 活性氧是指化学性质活跃的含氧原子或原子团,如超氧自由基(O2-)、过氧化氢(H2O2)、羟自由基(OH)等等.活性氧可使类脂中的不饱和脂肪酸发生过氧化反应,破坏细胞膜的结构. 活性氧…

CalBioreagents艾美捷兔单克隆抗人Id1克隆5-3说明书

CalBioreagents艾美捷兔单克隆抗人Id1克隆5-3背景&#xff1a; 简介&#xff1a;Id有四个成员蛋白质家族&#xff0c;Id1、Id2、Id3和Id4蛋白质最初被发现为蛋白质参与细胞的阴性对照区别Id蛋白作为阴性通过物理调节转录与一组转录因子的相互作用称为bHLH&#xff08;碱性螺旋…

艾美捷曲妥珠单抗Trastuzumab参数和相关研究

曲妥珠单抗是一种重组人源化单克隆抗体&#xff0c;特异性地作用于人表皮生长因子受体-2(HER2)的细胞外部位。此抗体含人IgG1 框架&#xff0c;互补决定区源自鼠抗p185 HER2 抗体&#xff0c;能够与HER2 绑定。 HER2 原癌基因或C-erbB2 编码一个单一的受体样跨膜蛋白&#xff0…

ソイラ / 索伊拉

目录 基本资料面板值&#xff08;无天冥加成&#xff09;天冥奖励 战斗宣言&#xff08;VC&#xff09;被动效果技能本体AS 回到人物索引 基本资料 NS(3~4★)AS卡池 (Ver 1.0.0)卡池 (Ver 2.13.81)ファランクスの書アリストクラットの異節 注&#xff1a;AS原头像无论怎么P总…

Pytorch从指定epoch恢复训练

1、在训练时保存每个epoch的模型&optimizer&epoch checkpoint {model: model.state_dict(), optimizer: optimizer.state_dict(), epoch: epoch} torch.save(checkpoint, path) 2、从指定epoch恢复 path_checkpoint "./models/checkpoint/ckpt_best_1.pth&qu…