阿里Dataworks使用循环节点和赋值节点完成对mongodb分表数据同步

embedded/2024/10/20 9:06:28/

背景

需求将MongoDB数据入仓MaxCompute

环境说明

MongoDB

100+个Collections:orders_1、orders_2、…、orders_100

前期准备

在这里插入图片描述

1、MongoDB数据源配置

需要先保证DW和MongoDB网络是能够联通的,需要现在集成任务中配置MongoDB的数据源信息。

具体可以查看我的另外一篇:https://blog.csdn.net/qq_16018407/article/details/142991582

2、赋值节点

选择赋值节点,赋值节点新增后打开,可以看到有Python、shell、ODPS SQL

Python 读取最后一次Print字符串,Shell读取最后一次echo输出的字符串,如”orders_1,order_2“ 就按照”,“逗号被拆分成2个元素用于后续循环

ODPS SQL 则是每一行是遍历的一个元素

每一次循环都会传入遍历的元素,如python :

print "orders_1,orders_2";

则会当做[“orders_1”,“orders_2”]数组进行遍历,每次一个元素会传入到遍历的循环中执行

实操界面:

print "orders_1,orders_2";

赋值节点会自动出现一个outputs给后面的节点读取

在这里插入图片描述

3、循环任务

在这里插入图片描述

新增完毕后进入到循环内部,会看到一个start 和end节点,这个时候我们再选择一个离线同步任务,将流程串起来

在这里插入图片描述

点开离线集成任务,切换到离线集成任务的脚本模式,赋值节点的collectionName会以”${dag.foreach.current}“ 参数传入到循环内部的流程中。

在集成任务脚本中,将对应的collectionName替换为 ${dag.foreach.current} 即可

{"transform": false,"type": "job","version": "2.0","steps": [{"stepType": "mongodb","parameter": {"objectIdOutputType": "json","useSplitVector": false,"datasource": "你的mongodb数据源名称","envType": 1,"cursorTimeoutInMs": "3600000","column": [{"name": "col_combine","type": "combine"}],"tableComment": "This kind of datasource dosen't support get table comment. This is a comment produced by di.","batchSize": "1000","collectionName": "${dag.foreach.current}"},"name": "Reader","category": "reader"},{"stepType": "odps","parameter": {"partition": "col=${dag.foreach.current}","truncate": true,"datasource": "你输出数据表的MaxCompute空间名称","envType": 1,"isSupportThreeModel": false,"tunnelQuota": "default","column": ["你的ODPS表的字段,因为我这里是想要将所有数据放在一个字段,所以这里就只预留了一个字段"],"emptyAsNull": false,"tableComment": "","table": "你的ODPS表","consistencyCommit": false},"name": "Writer","category": "writer"},{"copies": 1,"parameter": {"nodes": [],"edges": [],"groups": [],"version": "2.0"},"name": "Processor","category": "processor"}],"setting": {"errorLimit": {"record": "0"},"locale": "zh_CN","speed": {"throttle": false,"concurrent": 1}},"order": {"hops": [{"from": "Reader","to": "Writer"}]}
}

整个循环流程,点击右侧打开配置进行相关调度配置,最下方需要配置节点上下文 loopDataArray这个参数是读取外部的赋值节点,是必须配置的参数

在这里插入图片描述

日志

循环节点无法在dataworks的开发界面直接运营进行测试,只能发布以后在运维中心进行查看
在这里插入图片描述

最终效果

在这里插入图片描述

后期拓展

这里因为业务需求所以没有循环的参数是通过python print写死输出的

优雅一些的方式就是通过数据表维护,就可以动态读取数据表的内容,然后作为循环参数传入了

相关文档

for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.4.5.20a4d43aNd6b0E&scm=20140722.H_299261._.ID_299261-OR_rec-V_1#section-50c-r2v-mhd

赋值节点的操作步骤_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2019, September 10). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/configure-an-assignment-node?spm=a2c4g.11186623.0.0.2947b24b0wmXD7#task-2485378

for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.0.0.45634a14sGs7jS


http://www.ppmy.cn/embedded/128951.html

相关文章

Perl打印9x9乘法口诀

本章教程主要介绍如何用Perl打印9x9乘法口诀。 一、程序代码 1、写法① use strict; # 启用严格模式,帮助捕捉变量声明等错误 use warnings; # 启用警告,帮助发现潜在问题# 遍历 1 到 9 的数字 for my $i (1..9) {# 对于每个 $i,遍历 1…

Django 序列化serializers

在Django中,序列化通常指的是将数据库中的模型数据转换为JSON、XML或其他格式的过程。Django提供了内置的序列化工具,可以通过django.core.serializers模块进行序列化操作。 当你使用Django的序列化功能时,可以序列化以下两种对象类型&#…

【JavaScript】LeetCode:76-80

文章目录 76 有效的括号77 最小栈78 字符串解码79 每日温度80 柱形图中最大的矩形 76 有效的括号 栈三种不匹配的情况: ( [ { } ] ( ),最左边的"("多余,即字符串遍历完了,栈还不为空。[ { ( } } ],中间"…

开篇:SpringBoot与SpringCloud的那些事

在正式开始研究 SpringCloud 的技术之前,咱先简单的用比较短的篇幅聊一点概述性质的东西,让思维活跃起来。 SpringCloud与SpringBoot的关系和对比 一开始学习 SpringCloud 咱就知道,SpringCloud 的技术大多都不是自己造的,都是整合…

MySQL的并行复制原理

1. 并行复制的概念 并行复制(Parallel Replication)是一种通过同时处理多个复制任务来加速数据复制的技术。它与并发复制的区别在于,并行复制更多关注的是数据块或事务之间的并行执行,而不是单纯的任务并发。在数据库主从复制中&…

基于netty实现简易版rpc服务-理论分析

1.技术要点 1.1 rpc协议 定义一个rpc协议类,用于rpc服务端和客户端数据交互。 1.2 netty粘包半包处理 由于数据传说使用tcp协议,rpc协议的数据在网络传输过程中会产生三种情况: 1)刚好是完整的一条rpc协议数据 2)不…

如何给手机换ip地址

在当今数字化时代,IP地址作为设备在网络中的唯一标识,扮演着举足轻重的角色。然而,有时出于隐私保护、网络访问需求或其他特定原因,我们可能需要更改手机的IP地址。本文将详细介绍几种实用的方法,帮助您轻松实现手机IP…

苍穹外卖学习笔记(二十三)

拒单 OrderController /*** 拒单*/PutMapping("/rejection")ApiOperation("拒单")public Result rejection(RequestBody OrdersRejectionDTO ordersRejectionDTO) throws Exception {orderService.rejection(ordersRejectionDTO);return Result.success(…