DataX迁移数据到StarRocks大表报too many version问题记录

embedded/2024/11/9 16:31:12/

目录

1 背景说明

2 问题描述

3 解决思路

3.1 磁盘问题

3.2 DataX配置

3.3 分桶设置

3.4 增量迁移

4 其他


1 背景说明

       项目上有两张大表,数据量在300w左右,每天凌晨通过datax将前一天最新的全量数据迁移到StarRocks对应的分区表中。分区表设置的动态分区,保存最近3天的分区数据,并自动创建未来2天的分区。 每个分区存的都是对应源表截止到分区当天的全量数据。 

       通过DataX将数据迁移到StarRocks的任务每天凌晨有几百个,非常多,而且时间基本上都集中在凌晨0点到4点之间。 随着系统的运行,时间的推移,任务越来越多,每天要迁移到数仓的数据也越来越多。 

       StarRocks版本:2.2.13 RELEASE

2 问题描述

最近一个月发现这两张超过300w数据的大表,频繁出现任务迁移失败的问题,问题报错如下:

2024-08-05 16:30:09.286 [Thread-1] WARN StarRocksWriterManager - Failed to flush batch data to StarRocks, retry times = 0
java.io.IOException: Failed to flush data to StarRocks.
Too many versions. tablet_id: 7716375, version_count: 1001, limit: 1000: be:172.16.*.*
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"Too many versions. tablet_id: 7716375, version_count: 1001, limit: 1000: be:172.16.*.*","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"4ef91eac-3e95-4313-9a36-979f23fdbf1e","LoadBytes":5246126,"StreamLoadPutTimeMs":1,"NumberTotalRows":0,"WriteDataTimeMs":53,"TxnId":5530963,"LoadTimeMs":55,"ReadDataTimeMs":5,"NumberLoadedRows":0,"NumberFilteredRows":0}at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:95) ~[starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.asyncFlush(StarRocksWriterManager.java:174) [starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.access$000(StarRocksWriterManager.java:21) [starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager$1.run(StarRocksWriterManager.java:143) [starrockswriter-1.1.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
[INFO] 2024-08-05 16:30:11.097 +0800 [taskAppId=TASK-20240805-12935239844672_7-1874900-7057918] TaskLogLogger-class org.apache.dolphinscheduler.plugin.task.datax.DataxTask:[63] - -> 2024-08-05 16:30:10.288 [Thread-1] INFO StarRocksStreamLoadVisitor - Executing stream load to: 'http://172.16.*.*:8040/api/ods_zm/ods_QLSHZT_ms0_y_uoc_plan_diversion/_stream_load', size: '5246126'

       同样的配置,其他表都是正常,但是这两张大表失败的频率非常高,基本上平均每两天一次,但是如果在白天没有定时任务的时候,手动执行DataX任务的话,任务也可以成功,问题就比较诡异。 定时任务的失败,影响第二天报表的展示,在每天一早手工跑任务担惊受怕客户投诉的背景下,我们不得不想办法彻底解决这个问题。 

3 解决思路

       最直观的解决就是查这个报错,然后对应去修改,这个报错是StarRocks很常见的报错,官方针对这个问题也有解决方案,如下:

我们按照这个做过调整,但是问题并没有根治,还是会出现这个问题。 

针对这个问题,结合实际的问题情况,我们从以下几个角度去考虑和解决问题。

3.1 磁盘问题

       因为这两张大表的数据在白天是可以正常迁移成功的,只有在凌晨任务量比较多的时候才会频繁失败,考虑有没有可能是磁盘性能问题,通过安装sar命令,对磁盘做压测,如下:

这里面每个参数信息如下:

  • DEV:设备名称,表示监控的磁盘或分区,例如 /dev/sda
  • tps:每秒传输次数,表示每秒完成的读写操作次数。
  • rd_sec/s:每秒读扇区数,表示每秒从磁盘读取的扇区数。
  • wr_sec/s:每秒写扇区数,表示每秒向磁盘写入的扇区数。
  • avgrq-sz:平均请求队列大小,表示平均每次 I/O 请求等待的平均队列长度。
  • avgqu-sz:平均队列长度,表示平均 I/O 请求队列的长度。
  • await:平均等待时间,表示 I/O 请求的平均等待时间(毫秒)。
  • svctm:平均服务时间,表示平均每次 I/O 请求的服务时间(毫秒)。
  • %util:利用率,表示磁盘繁忙程度的百分比。

       从这个参数看,await值有些偏高,初步怀疑磁盘性能可能是瓶颈。 给项目上的硬件组反馈该问题,硬件侧的同学不认可。考虑到我们压测时候的数据写入的量级和实际凌晨的任务执行时候有差异,所以我们尝试写一个监控脚本,监控每天凌晨0点到4点这期间的磁盘使用情况,每隔一分钟输出对应的Average信息。

监控使用Python开发,默认主机的python版本是2.7的版本,参考源码如下:

python"># -*- coding: utf-8 -*-
import time
import subprocess
import datetime# 定义输出文件的路径
output_file_path = "/middleware/output.log"
# 定义 sar 命令
sar_command = "sar -d | head -3; sar -d | grep dev252"# 定义一个函数来执行 sar 命令并写入文件
def execute_sar_command():# 格式化当前时间formatted_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 创建一个子进程process = subprocess.Popen(sar_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)# 获取 sar 命令的输出output, error = process.communicate()# 检查命令是否成功执行if process.returncode == 0 and output:# 分割输出文本为行lines = output.split('\n')# 获取最后一行的索引last_line_index = len(lines) - 2# 获取 Average 行average_line = lines[last_line_index]# 加上时间戳average_line_with_timestamp = "{} - {}".format(formatted_time, average_line)# 将结果写入文件with open(output_file_path, 'a') as file:file.write(average_line_with_timestamp + "\n")slog = "{} - {}".format(formatted_time,'Data saved to ' + output_file_path)print(slog)else:slog = "{} - {}".format(formatted_time, 'Failed to execute sar command: ' + error)print(slog)if __name__ == "__main__":# 主循环while True:# 获取当前时间current_time = datetime.datetime.now()# 定义开始时间start_time = current_time.replace(hour=0, minute=0, second=0, microsecond=0)# 定义结束时间end_time = current_time.replace(hour=5, minute=0, second=0, microsecond=0)# 判断是否在指定区间内if start_time <= current_time <= end_time:execute_sar_command()time.sleep(60)  # 每 1 分钟执行一次else:print("{} - {}".format(current_time, ' Current time is out of the desired range. Skipping execution.'))time.sleep(300)  # 每 5 分钟执行一次

       将这个脚本放到StarRocks集群的每天节点上,然后后台启动脚本常驻进程即可。

nohup python sarMonitor.py > nohup.log 2>&1 &

       通过这个可以监控到凌晨任务非常多时候,磁盘的使用情况。

通过监控,发现磁盘的瓶颈问题并不是核心问题,毕竟每天凌晨95%的任务都是正常的,数据量也不小,只有这一两个大表数据不行,所以还是从其他方面再考虑。

3.2 DataX配置

       首先要确认下DataX是否已经使用了starrockswriter,尽管mysqlwriter也可以正常写入数据,但是效率要差得远。

       官网关于datax导入有专门的章节,参考这里。这里,贴一段项目上的datax配置信息,如下所示:

{"job":{"content":[{"reader":{"parameter":{"password":"xxxxxxx","connection":[{"querySql":["select `plan_id`,replace(`source_code`,\"'\",\"\"),`header_id`,`line_id`,replace(`project_no`,\"'\",\"\"),replace(`project_name`,\"'\",\"\"),replace(`schedule_no`,\"'\",\"\"),replace(`schedule_type_code`,\"'\",\"\"),replace(`schedule_type_name`,\"'\",\"\"),`schedule_date`,replace(`schedule_section`,\"'\",\"\"),replace(`attribute_category`,\"'\",\"\"),replace(`single_project`,\"'\",\"\"),replace(`header_comment`,\"'\",\"\"),replace(`line_num`,\"'\",\"\"),replace(`line_comment`,\"'\",\"\"),replace(`plan_status`,\"'\",\"\"),replace(`match_status`,\"'\",\"\"),replace(`execution_way`,\"'\",\"\"),replace(`source`,\"'\",\"\"),`erp_org_id`,replace(`erp_organization_code`,\"'\",\"\"),replace(`erp_organization_name`,\"'\",\"\"),replace(`erp_org_level`,\"'\",\"\"),replace(`org_tree_path`,\"'\",\"\"),`org_id`,replace(`org_name`,\"'\",\"\"),`erp_buyer_person_id`,replace(`erp_buyer_emp_num`,\"'\",\"\"),replace(`erp_buyer_person_num`,\"'\",\"\"),replace(`erp_buyer_full_name`,\"'\",\"\"),`buyer_person_id`,replace(`buyer_person_num`,\"'\",\"\"),replace(`buyer_full_name`,\"'\",\"\"),`planer_erp_id`,replace(`use_department`,\"'\",\"\"),replace(`use_location`,\"'\",\"\"),replace(`centralized_purchasing_flag`,\"'\",\"\"),replace(`centralized_purchasing_list`,\"'\",\"\"),`item_id`,replace(`item_no`,\"'\",\"\"),replace(`item_brand`,\"'\",\"\"),replace(`category_id`,\"'\",\"\"),replace(`item_category`,\"'\",\"\"),replace(`item_category1`,\"'\",\"\"),replace(`item_category2`,\"'\",\"\"),replace(`item_category3`,\"'\",\"\"),replace(`item_channels`,\"'\",\"\"),replace(`item_desc`,\"'\",\"\"),replace(`item_specification`,\"'\",\"\"),replace(`fasch_flag`,\"'\",\"\"),replace(`tech_parameter`,\"'\",\"\"),replace(`mpn_vendor`,\"'\",\"\"),`quantity`,`scheme_qty`,`source_qty`,`order_qty`,`package_qty`,`result_qty`,`commodity_qty`,`agreement_qty`,`budget_price`,`unit_price`,`budget_amount`,`total_amount`,replace(`expense_category`,\"'\",\"\"),replace(`currency_code`,\"'\",\"\"),`need_by_date`,replace(`delivery_location`,\"'\",\"\"),replace(`material`,\"'\",\"\"),replace(`uom_code`,\"'\",\"\"),replace(`mpn_num`,\"'\",\"\"),replace(`executive_std`,\"'\",\"\"),replace(`flow_code`,\"'\",\"\"),`created_userid`,replace(`created_emp_num`,\"'\",\"\"),replace(`created_full_name`,\"'\",\"\"),`created_date`,`approved_date`,`lead_time`,replace(`procure_type`,\"'\",\"\"),replace(`status`,\"'\",\"\"),`rate`,replace(`todo`,\"'\",\"\"),replace(`cux_status`,\"'\",\"\"),replace(`erpsource`,\"'\",\"\"),`last_updated_by`,`last_update_date`,replace(`return_status`,\"'\",\"\"),replace(`return_msg`,\"'\",\"\"),replace(`in_Implement_catalog`,\"'\",\"\"),replace(`in_controls_catalog`,\"'\",\"\"),replace(`last_match_status`,\"'\",\"\"),replace(`transfer_good_status`,\"'\",\"\"),`non_recruit_order_qty`,replace(`ext1`,\"'\",\"\"),replace(`ext2`,\"'\",\"\"),replace(`ext3`,\"'\",\"\"),replace(`UPDATE_OPER_ID`,\"'\",\"\"),replace(`UPDATE_OPER_NAME`,\"'\",\"\"),`UPDATE_TIME`,replace(`province_id`,\"'\",\"\"),replace(`city_id`,\"'\",\"\"),replace(`area_id`,\"'\",\"\"),replace(`address`,\"'\",\"\"),replace(`province_name`,\"'\",\"\"),replace(`city_name`,\"'\",\"\"),replace(`area_name`,\"'\",\"\"),replace(`address_name`,\"'\",\"\"),replace(`detail_address`,\"'\",\"\"),replace(`addr_company`,\"'\",\"\"),replace(`addr_user`,\"'\",\"\"),replace(`addr_phone`,\"'\",\"\"),replace(`addr_mobile`,\"'\",\"\"),replace(`step_id`,\"'\",\"\"),`cancel_match_approval_state`,`contract_qty`,`ssc_scheme_qty`,`crc_result_qty`,`crc_source_qty`,`project_qty`, now() as sync_time, date_sub(CURDATE(), interval 1 day) as sync_date from uoc_plan_diversion"],"jdbcUrl":["jdbc:mysql://172.16.*.*:3306/database_name?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"]}],"username":"uname"},"name":"mysqlreader"},"writer":{"parameter":{"password":"xxxxx","loadProps":{"format":"json","strip_outer_array":true},"column":["`plan_id`","`source_code`","`header_id`","`line_id`","`project_no`","`project_name`","`schedule_no`","`schedule_type_code`","`schedule_type_name`","`schedule_date`","`schedule_section`","`attribute_category`","`single_project`","`header_comment`","`line_num`","`line_comment`","`plan_status`","`match_status`","`execution_way`","`source`","`erp_org_id`","`erp_organization_code`","`erp_organization_name`","`erp_org_level`","`org_tree_path`","`org_id`","`org_name`","`erp_buyer_person_id`","`erp_buyer_emp_num`","`erp_buyer_person_num`","`erp_buyer_full_name`","`buyer_person_id`","`buyer_person_num`","`buyer_full_name`","`planer_erp_id`","`use_department`","`use_location`","`centralized_purchasing_flag`","`centralized_purchasing_list`","`item_id`","`item_no`","`item_brand`","`category_id`","`item_category`","`item_category1`","`item_category2`","`item_category3`","`item_channels`","`item_desc`","`item_specification`","`fasch_flag`","`tech_parameter`","`mpn_vendor`","`quantity`","`scheme_qty`","`source_qty`","`order_qty`","`package_qty`","`result_qty`","`commodity_qty`","`agreement_qty`","`budget_price`","`unit_price`","`budget_amount`","`total_amount`","`expense_category`","`currency_code`","`need_by_date`","`delivery_location`","`material`","`uom_code`","`mpn_num`","`executive_std`","`flow_code`","`created_userid`","`created_emp_num`","`created_full_name`","`created_date`","`approved_date`","`lead_time`","`procure_type`","`status`","`rate`","`todo`","`cux_status`","`erpsource`","`last_updated_by`","`last_update_date`","`return_status`","`return_msg`","`in_Implement_catalog`","`in_controls_catalog`","`last_match_status`","`transfer_good_status`","`non_recruit_order_qty`","`ext1`","`ext2`","`ext3`","`UPDATE_OPER_ID`","`UPDATE_OPER_NAME`","`UPDATE_TIME`","`province_id`","`city_id`","`area_id`","`address`","`province_name`","`city_name`","`area_name`","`address_name`","`detail_address`","`addr_company`","`addr_user`","`addr_phone`","`addr_mobile`","`step_id`","`cancel_match_approval_state`","`contract_qty`","`ssc_scheme_qty`","`crc_result_qty`","`crc_source_qty`","`project_qty`","`sync_time`","`sync_date`"],"loadUrl":["172.16.0.1:8040","172.16.0.2:8040","172.16.0.3:8040"],"connection":[{"selectedDatabase":"ods_zm","jdbcUrl":"jdbc:mysql:loadbalance://172.16.0.1:9030,172.16.0.2:9030,172.16.0.3:9030/","table":["ods_QLSHZT_ms0_y_uoc_plan_diversion"]}],"writeMode":"insert","username":"uname"},"name":"starrockswriter"}}],"setting":{"speed":{"channel":"4"}}}
}

       可以看到writer已经是starrockswriter了。

       考虑到凌晨任务非常多,磁盘繁忙并且无法调整为硬件,我们可以尝试调整datax的配置,参考官方对于writer中的参数说明,我们做了一下调整:

① 修改speed配置如下

② 显示的设置maxBatchSize

设置"maxBatchSize":100857600。 通过显示的设置maxBatchSize,发现starrockswriter攒批写入频次变低了、写入速度更快了,300w的数据,原来迁移需要5分钟左右,调整后,2分钟多就可以迁移完。

效率有了明显提升。

3.3 分桶设置

       对于分区一般都有一个基本的认知,基于分区键,将数据进行不同分区的数据路由,分桶是SR独有的概念,在分区下面又有分桶,基于分桶再对数据做二次的分布,并提供副本机制。其实也比较简单,就是确定分区的情况下,在分区下面查询又有分区(分桶),主要还是为了优化查询效率。

       300w的数据量对于数仓StarRocks不算是很大的数据量,对于这种小数据量级场景的,通过设置分桶靠SR并行机制带来的查询效率提升,微乎其微,反倒是tablet细碎 有大量元数据要维护 消耗内存更多,所以我们尝统一将分桶数都改成1。

       这里其实有一个实际实施时候的困难点,业务人员对数仓的特性不了解,了解数仓特性的技术人员对业务不了解,然后基于数仓又开发了更上层的数据中台等应用,让业务人员直接使用,这就造成实际实施的时候,很难讲数仓的威力发挥出来,因为业务人员没办法明确知道那些列或者那些字段作为分桶键效果更好。 或者说什么时候需要走分桶,什么时候不需要分桶,从技术的角度来说,关于分桶的概念很好理解,但是要使用好它,前提还是要和业务数据结合起来。 业务和技术的结合,是项目实施的难点。

       回到这个问题中,对于平台提供方的我们,对业务也不懂,所以也无法给出很好的分桶设置建议,但是考虑到官方关于分桶数据量设置的建议如下:

       结合实际业务场景,我们就没必要设置多个分桶了,直接设置分桶数为1即可。

注:这块的调整,暂未在业务上做验证,任务都是凌晨跑的,没有即席查询场景,所以对于查询效率的影响没有做验证,调整分桶数主要是考虑减少tablet数量。

3.4 增量迁移

       还有一种方案就是做增量迁移,就是DataX的reader中数据查询带上条件即可。这样每天凌晨迁移数据的时候,都是只迁移增量的数据。 但是这种方案有一些限制因素,如果源表没有增量字段,或者删除是物理删除,这种场景下增量迁移可能就无法实现了。

4 其他

       关于DataX的配置,StarRocks官方的文档说默认maxBatchSize是100M,我们想当然认为不用设置,直接走默认值就好了。但是显示的设置之后,效果明显提升了很多,为何? 

见下图,从源码中可以看到,实际上源码中,如果这个maxBatchSize不设置的话,默认其实是5M,并不是官方提到的100M。 


http://www.ppmy.cn/embedded/93466.html

相关文章

kafka-go使用:以及kafka一些基本概念说明

关于kafka 作为开发人员kafka中最常关注的几个概念&#xff0c;是topic,partition和group这几个概念。topic是主题的意思&#xff0c;简单的说topic是数据主题&#xff0c;这样解释好像显得很苍白&#xff0c;只是做了个翻译。一图胜前言&#xff0c;我们还是通过图解来说明。…

JavaScript入门

引入方式 书写语法 输出语句 变量 let声明的变量只在代码块内有效&#xff0c;const声明的变量为常量 数据类型和运算符 类型转换案例 第二个读到A自动截掉 函数 案例 对象 Array数组 for&#xff1a;遍历所有 foreach&#xff1a;遍历所有有值的元素 案例 1. 2. var arr3[…

除了http和https以外的协议

除了HTTP和HTTPS之外&#xff0c;计算机网络中还有许多其他常用的协议。以下是一些常见的协议&#xff0c;以及它们的作用和使用场景&#xff0c;以表格形式总结如下&#xff1a; 协议名称作用使用场景TCP&#xff08;传输控制协议&#xff09;面向连接的、可靠的、基于字节流…

uni-app开发微信小程序注意事项,不要用element-ui

前端扩展组件千万不要用element-ui&#xff0c;开发的时候不报错&#xff0c;发布的时候会报错无法发布。 可以用vant weapp【注意是weapp】 iView weapp 附上hbuilder官方文档 组件的概念 | uni-app官网 (dcloud.net.cn)

LangChain与RBAC:构建安全编程辅助的堡垒

LangChain与RBAC&#xff1a;构建安全编程辅助的堡垒 在软件开发中&#xff0c;确保代码的安全性和合规性是至关重要的。角色基础的访问控制&#xff08;RBAC&#xff09;是一种流行的安全策略&#xff0c;用于限制用户对系统资源的访问。LangChain&#xff0c;作为一个假设的…

使用API有效率地管理Dynadot域名,查看某一域名的拍卖详情

前言 Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮箱&…

【开端】JAVA中的切面使用

一、绪论 在不使用过滤器和 拦截器的前提下&#xff0c;如果统一对JAVA的 方法进行 管理。比如对一类方法或者类进行日志监控&#xff0c;前后逻辑处理。这时就可以使用到切面。它的本质还是一个拦截器。只是通过注解的方式来标识所切的方法。 二、JAVA中切面的使用实例 Aspec…

springboot引入redis

1&#xff0c;引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version> </dependency><dependency><groupId>org.projec…