datax 需要手动执行 python 脚本来满足需求,可通过XXL-JOB 进行任务调度实现,满足自动化数据同步需求。
1、nacos 配置
# 配置
xxxx:job:runScript: /home/midware/datax/script/mysql_ps_test.shpythonScriptPath: /home/midware/datax/bin/datax.py jobFilePath: /home/midware/datax/job/mysql_postgres_job_update.json
2、编写shell脚本
mysql_ps_test.sh
#!/bin/bash# 检查参数数量
if [ "$#" -ne 2 ]; thenecho "Usage: $0 <script_path> <config_file_path>"exit 1
fi# 获取传入的参数
script_path="$1"
config_file_path="$2"# 运行 Python 脚本
python "$script_path" "$config_file_path"# 如果您希望在执行后检查状态码并采取进一步行动,可以添加如下代码:
#if [ $? -ne 0 ]; then
# echo "Error occurred while running the script."
# # 可以在这里添加错误处理逻辑
#fi
使用说明:
- 将上述脚本保存到一个文件中,例如 mysql_ps_test
.sh
。 - 给这个文件添加可执行权限:
chmod +x
mysql_ps_test.sh
. - 运行脚本并传递参数:
./run_datax.sh /home/midware/datax/job/mysql_postgres_job_update.py /home/midware/datax/job/mysql_postgres_job_update.json
。
3、job编写
mysql_postgres_job_update.json
{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"username": "root","password": "xxxxxx","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": [" SELECT * FROM sys_test_copy2 WHERE create_time BETWEEN NOW() - INTERVAL 5 MINUTE AND NOW() "]}}},"writer":{"name":"postgresqlwriter","parameter":{"writeMode": "update!@#(user_id)!@#(email)","column":["id","name"],"connection":[{"jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres","table":["sys_test_copy2"]}],"password":"xxxx","username":"postgres"}}}],"setting":{"speed":{"channel":6}}}
}
4、编写一个xxl-job 任务
@Component
public class SampleXxlJob {private static final Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);@Resourceprivate ScriptProperties scriptProperties;/*** 3、命令行任务*/@XxlJob("commandJobHandler")public void commandJobHandler() {executeCommandForScript(Arrays.asList("sh", scriptProperties.getRunScript(),scriptProperties.getPythonScriptPath(),scriptProperties.getJobFilePath()));}private void executeCommandForScript(List<String> command) {// 使用ProcessBuilder执行命令try {logger.info("---------->>>>command:{}", StringUtils.join(command, " "));ProcessBuilder pb = new ProcessBuilder(command);Process process = pb.start(); // 等待命令执行完成int exitCode = process.waitFor();if (exitCode == 0) {XxlJobHelper.log("Command completed successfully.");} else {XxlJobHelper.handleFail("Command exit value(" + exitCode + ") is failed");} } catch (IOException | InterruptedException e) {XxlJobHelper.handleFail("Command is failed " + e.getMessage());}}
}
定时任务可设置每5分钟跑一次,这样就可以近实时同步数据了
0 */5 * * * ?
5、小结:
至此,从datax 到 job 编写,到postgres plug 源码调整,到xxl-job 定时执行。完成了一个定时数据同步功能。