canal Failed to update metadata after 60000 ms 等问题解决

news/2024/11/6 9:52:51/

问题排查了一天,很长

问题描述:

需求

需要将mysql数据通过canal传入kafka
测试将正则匹配的表按照字段动态存入不同topic中
例如存在两类表
A_1 , A_2 , A_3 , B_1 , B_2 , B_3
A类表 -> topic_1
B类表 -> topic_2


canal最初对应配置如下
canal.instance.filter.regex = test\.A,test\.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum=3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test\.A,test2:test\.B

执行一段时间后就会报错如下

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms

由于处于测试阶段,将canal 删除zookeeper元数据及重启后并没有效果。


问题分析

查看日志example.log
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2022-02-07 00:01:48.113 [pool-4-thread-1] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_282]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_282]at java.lang.Thread.run(Thread.java:748) [na:1.8.0_282]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na]at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na]at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na]at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na]at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na]... 8 common frames omitted

看不出问题原因

查看canal.log
WARN [Producer clientId=producer-8] Error while fetching metadata with correlation id xxx: {=UNKNOWN_TOPIC_OR_PARTITION}

最初显示 空格 = UNKNOWN_TOPIC_OR_PARTITION
后来突然想到是kafka配置关闭了自动创建topic,修改配置为true
在这里插入图片描述
在将配置改为true后还是同样报错

WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id XXX: {=INVALID_TOPIC_EXCE}

想到空值肯定是不能作为topic名啊,为什么会有空值?

重新查看canal配置
想到mysql ddl 以及dml
我只需要dml INSERT UPDATE DELETE相关记录
由于使用动态分区,指定hash字段,而ddl读取数据格式为以下格式,data为null。

{"data": null,"database": "test","es": 1636952896000,"id": 2389,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "ALTER TABLE `test`.`demo10` \r\nCHANGE COLUMN `addr` `address` varchar(255) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL AFTER `age`","sqlType": null,"table": "demo10","ts": 1636952896772,"type": "ALTER"
}

于是添加 canal.instance.filter.query.ddl= true 想要过滤掉ddl相关数据

canal.instance.filter.regex = test\.A,test\.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test\.A,test2:test\.B
canal.instance.filter.query.ddl = true

但重启canal后执行查看还是报错如下:

parse faield : CREATE DEFINER = \`root\`@\`%\` PROCEDURE \'getWsTotal\'(......com.alibaba.fastsql.sql.parser.ParserException: syntax error, error in : 'cur ;read_loop : loop......

在这里插入图片描述
不是已经过滤了ddl 难道dml里面也有东西?
想到将匹配不到的数据放入默认topic中 于是创建topic test
(看到下面成功后test topic中的数据 “isDdl”: false,

最终配置更改为

canal.instance.filter.regex = test\.A,test\.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test\.A,test2:test\.B
canal.instance.filter.query.ddl = true
canal.mq.topic= test

可以查看到数据
果然存在数据传入默认的test topic中 ,data还是null 导致的

{"data": null,"database": "","es": 1644301624000,"id": 800,"isDdl": false,"mysqlType": null,"old": null,"pkNames": null,"sql": "CREATE DEFINER\u003d`root`@`%` PROCEDURE `_Navicat_Temp_Stored_Proc`(IN iqsri varchar(20),IN iedri varchar(20))\nBEGIN\r\n\tdeclare c varchar(20);\r\n  declare total int default 0;   \r\n  declare done int default false;   \r\n  declare cur cursor for select DISTINCT CONCAT(\u0027tbl_ec_document_\u0027,ssb) as tablename from tbl_jtxx;   \r\n  declare continue HANDLER for not found set done \u003d true;\r\n\tset @iqsri\u003diqsri;\r\n\tset @iedri\u003diedri;\r\n\tset @tempsql \u003d\u0027\u0027;\r\n\tset @tempsqlend \u003d\u0027\u0027;\r\n\tIF (iqsri is not null)\r\n\tTHEN\r\n\t\tSET @tempsqlend\u003dCONCAT(@tempsqlend,\" and CREATE_DATE\u003e\u003d\u0027\",@iqsri,\"\u0027 \");\r\n\tend IF;\r\n\tIF (iedri is not null)\r\n\tTHEN\r\n\t\tSET @tempsqlend\u003dCONCAT(@tempsqlend,\" and CREATE_DATE\u003c\u003d\u0027\",@iedri,\"\u0027 \");\r\n\tend IF;\r\n\topen cur;   \r\n\tread_loop:loop   \r\n\tfetch cur into c;    \r\n\tif done then  \r\n\t\t\tleave read_loop; \r\n\tend if;    \r\n\tset @tempsql\u003dCONCAT(@tempsql,\" select nsrsbh,nsrmc,lrfs,count(fpqqlsh) as sl,issue_err_msg\r\n\t\t\t\t\t  from \",c,\" where issued \u003d \u00279\u0027 \",@tempsqlend,\"group by nsrsbh, issue_err_msg  UNION ALL\"); \r\n\tend loop;    \r\n\tclose cur;\r\n\tset @tempsql\u003dSUBSTR(@tempsql,1,(CHAR_LENGTH(@tempsql)-9));\r\n\r\n\tset @tempsql\u003dCONCAT(@tempsql,\" order by issue_err_msg  desc\");\r\n   #select @tempsql;\r\n\tprepare stmt from @tempsql;\r\n\tEXECUTE stmt;\r\n\tdeallocate prepare stmt; \r\nEND","sqlType": null,"table": "","ts": 1644301624400,"type": "QUERY"
}

解决办法:

canal动态根据表传入不同topic配置 类似如下

canal.instance.filter.regex = test\.A,test\.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test\.A,test2:test\.B
# ddl dcl 
canal.instance.filter.query.ddl = true
canal.mq.topic= test

http://www.ppmy.cn/news/601603.html

相关文章

DataNode日志出现 60000 millis timeout while waiting for channel to be ready for read. ch

DataNode是Hadoop分布式文件系统(HDFS)中的节点&#xff0c;它负责存储文件系统中的数据块并处理来自客户端的数据读写请求。 如果在DataNode日志中出现“60000 millis timeout while waiting for channel to be ready for read. ch”&#xff0c;这意味着DataNode在等待与客户…

HBase RIT异常分析及其解决方案, region(s)in transistion for more than 60000 milliseceonds

最近的一段时间&#xff0c;HBase集群突然出现了一些异常&#xff0c;偶尔部分机器会出现RIT的情况&#xff0c; Region-In-Trasition机制 HBase在RegionState类中定义了Region的主要状态&#xff0c;主要有如下&#xff1a; 定义了四种会触发Region状态变迁的操作以及操作对应…

记一次连接泄漏GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20, creating 0

com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20, creating 0 活动的连接数为20&#xff0c; 最大的连接数为20&#xff0c; 活动的连接数与最大连接数相同&#xff0c;连接池用完了&#xff0c;在等待60秒后&#xff0c;…

com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20

本文目录 一、异常现象 二、异常描述 三、解决方案 3.1 properties配置文件中 3.2 xml配置文件中 四、注意事项 一、异常现象 com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20, creating 0 二、异常描述 从异常现…

mysql 报wait millis 60000, active 0, maxActive 50, creating 0, createErrorCount 9913 错误 解决记录

1.由于在测试环境中&#xff0c;应用程序的微服务个数不是很多&#xff0c;且每个服务的连接池初始化链接数为:50,上线后一部分微服务出现大量以下的错误: com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 0, maxActive 50, creating 0, cr…

lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s) timeout 60000超时问题

有一台服务器 java程序不定期会出现Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s) 错误&#xff0c;导致应用出现 timeout 60000 错误&#xff0c;重启应用后&#xff0c;问题修复&#x…

记录一次线上事故:GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20, creating 0

前几天同事说项目出问题了&#xff0c;请求一直报错&#xff0c;我看了下服务器日志&#xff0c;发现服务器一直报错Caused by: com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 20, maxActive 20, creating 0 大致意思是druid的连接数&…

【完美解决】- 使用JavaApI 连接查询HBASE 出现 java.net.SocketTimeoutException: callTimeout=60000

1.报错 org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts36, exceptions:Tue May 28 08:16:11 CST 2019, null, java.net.SocketTimeoutException: callTimeout60000, callDuration63316: cm5 row stu01,, on table hbase:meta at regionhb…