kafka使用常见问题

embedded/2025/1/8 5:51:17/

kafka_2">连接不上kafka,报下边的错

org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer756","payerAcc":"payer_acc756","payeeName":"payee756","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer757","payerAcc":"payer_acc757","payeeName":"payee757","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer758","payerAcc":"payer_acc758","payeeName":"payee758","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer759","payerAcc":"payer_acc759","payeeName":"payee759","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer760","payerAcc":"payer_acc760","payeeName":"payee760","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer761","payerAcc":"payer_acc761","payeeName":"payee761","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
解决方法

cat server.properties 文件中配置一下下边的参数, 当初是localhost所以访问丢失
在这里插入图片描述

2025-01-01 09:18:04.991  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-01-01 09:18:05.737  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2025-01-01 09:18:05.843  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.112:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer2025-01-01 09:18:05.862  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.2
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 25b1aea02e37da14
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1735694285881
2025-01-01 09:18:06.127  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to ZiipuoTKS22oBX6HbBpMbQ
2025-01-01 09:18:06.128  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: DioEcCfQQNi6Ea50_-07Ag
2025-01-01 09:18:06.160  INFO 6240 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 16 with epoch 0
2025-01-01 09:18:08.127  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

kafka_152">kafka消费者报错

2025-01-01 17:04:38.425 ERROR 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Offset commit failed on partition FraudAcc-0 at offset 25: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  WARN 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Asynchronous auto-commit of offsets {FraudAcc-0=OffsetAndMetadata{offset=25, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

解决方法

根据你提供的日志信息,错误的根本原因在于Kafka消费者在尝试提交偏移量(offset commit)时遇到了协调者(coordinator)不知道的成员(member)。这通常发生在以下几种情况下:

  1. 消费者组重新平衡:当消费者组中的一个或多个消费者加入或离开时,Kafka会触发一次重新平衡操作。在此期间,所有消费者都会暂时失去对分区的所有权,并且需要重新加入组以获得新的分配。如果你的消费者在重新平衡后没有正确地重新加入,它将无法提交偏移量。

  2. max.poll.interval.ms超时:这个配置项定义了消费者可以花费多长时间处理来自poll()调用的消息而不返回到Kafka进行心跳检测。如果消费者花费的时间超过了这个值,Kafka认为该消费者已经死亡并触发重新平衡。这可能是由于消息处理时间过长或者消费者卡住未能及时调用poll()导致的。

  3. session.timeout.ms超时:这是另一个与消费者健康检查有关的参数,它决定了Kafka等待消费者发送心跳的最大时间。如果超过这个时间没有收到心跳,Kafka也会认为该消费者已经死亡并触发重新平衡。

  4. max.poll.records设置过高:如果你的max.poll.records配置得太高,那么每次poll()调用可能会返回大量的记录,从而增加了处理这些记录所需的时间,可能导致max.poll.interval.ms超时。

解决方案

你可以通过调整以下几个配置来解决这个问题:

  • 增加max.poll.interval.ms:如果知道你的消费者可能需要更多时间来处理一批消息,可以适当增加这个值。默认是5分钟(300,000毫秒),你可以根据实际情况调整为更长的时间。
    spring.kafka.consumer.max-poll-interval=600000  # 例如设置为10分钟
    
  • 减少max.poll.records:降低每次poll()调用返回的最大记录数,可以减少单次处理的负担,使消费者更快地完成处理并返回心跳。
    spring.kafka.consumer.max-poll-records=100  # 根据具体情况调整
    
  • 优化消息处理逻辑:确保你的消息处理逻辑尽可能高效,避免长时间阻塞的操作,如数据库查询、网络请求等。考虑使用异步处理或其他方式来加速处理过程。
  • 检查消费者的健壮性:确保消费者不会因为异常情况而卡住或崩溃。添加适当的异常处理和监控机制可以帮助快速发现和解决问题。
  • 确保消费者组ID唯一:确认每个消费者使用的消费者组ID是唯一的,除非它们确实属于同一个消费者组并且共享相同的订阅主题。

日志分析

从日志中可以看到,消费者consumer-my-group-49试图提交偏移量,但协调者并不知道这个成员的存在,因此提交失败。然后消费者重置了它的生成代(generation),并请求加入组。紧接着的日志警告指出了问题的核心——消费者在两次poll()之间的间隔超过了max.poll.interval.ms,这通常意味着消息处理时间过长。


http://www.ppmy.cn/embedded/151919.html

相关文章

学习笔记 : MySQL进阶篇一之架构和日志文件

MySQL架构图 Connectors连接器:负责跟客户端建立连接 Management Serveices & Utilities系统管理和控制工具 Connection Pool连接池:管理用户连接,监听并接收连接的请求,转发所有连接的请求到线程管理模块 SQL Interface SQL接…

谷粒商城-高级篇-Sentinel-分布式系统的流量防卫兵

1、基本概念 1.1、熔断降级限流 1、什么是熔断 A 服务调用 B 服务的某个功能,由于网络不稳定问题,或者 B 服务卡机,导致功能时间超长。如果这样子的次数太多。我们就可以直接将 B 断路了( A 不再请求 B 接口)&#…

Nginx linux安装步骤(超详细)

官网下载 .tar.gz 压缩包 https://nginx.org/en/download.html 2. 执行 tar -zxvf xxxx.tar.gz 进行解压 3.cd nginx 进入nginx目录 4. ./configure 自动配置 5. make 6.make install 7.whereis nginx (查看nginx地址,并且cd 进入到 /nginx/sbin目录) 8. ./ngi…

网段划分和 IP 地址

1. IP 协议 IP 协议是网络层协议,主要负责在不同网络设备之间,进行数据包的地址分配和路由选择。 地址分配:为每个连接到公网的设备分配一个唯一的 IP 地址,确保数据能被准确地发送到目标设备。 数据分片和组装:当发…

Oracle中的TO_CHAR字符转化

在 Oracle 数据库中,TO_CHAR 函数用于将其他数据类型(如数字、日期等)转换为字符串。根据不同的使用场景,TO_CHAR 的功能可以非常灵活。以下是 TO_CHAR 函数的常见用法: 1. 将日期转换为字符串 TO_CHAR 可以将日期或时…

欧几里得距离在权重矩阵中的物理意义

欧几里得距离在权重矩阵中的物理意义 目录 欧几里得距离在权重矩阵中的物理意义**衡量神经元差异程度**:**反映模型变化程度**:**聚类和分组的依据**:自然语言处理中的模型更新:**神经网络聚类分组**:欧几里得距离在权重矩阵中的物理意义衡量神经元差异程度: 在神经网络中…

三格电子新品上市——IEC103 转 ModbusTCP 网关

型号:SG-TCP-IEC103(Tb可s 三格电子) 第一章 产品概述 IEC103 转 ModbusTCP 网关型号 SG-TCP-IEC103 ,是三格电子推出的工业级网关(以下简 称网关),主要用于 IEC103 数据采集、 DLT645-…

Python毕业设计选题:基于Hadoop 的国产电影数据分析与可视化_django+spider

开发语言:Python框架:djangoPython版本:python3.7.7数据库:mysql 5.7数据库工具:Navicat11开发软件:PyCharm 系统展示 管理员登录 管理员功能界面 用户管理 免费电影管理 在线论坛 留言反馈 看板展示 系统…