记一次Kafka重复消费解决过程

news/2024/11/9 0:45:18/

        起因:车联网项目开发,车辆发生故障需要给三个系统推送消息,故障上报较为频繁,所以为了不阻塞主流程,采用了使用kafka。消费方负责推送并保存推送记录,但在一次压测中发现,实际只发生了10次故障,但是推送记录却有30多条。

        问题排查,发现是因为其中一个系统宕机,导致往这个系统推送消息时,一直连接超时,导致每条消息的推送时长被拉长。而且kafka消息拉取参数max-poll-records设置了500,意味着一次会批量拉取500条消息到本地处理,而max.poll.interval.ms参数默认是5分钟,当500条消息处理时长超过5分钟后,就会认为消费者死掉了,触发再均衡,导致同一个消息被重复消费。

解决:

        主要是提高消费者的处理速度,避免不必要的Rebalance。主要采用2种措施:

  1. 减少每次拉去消息数max-poll-records,从500,降到20
  2. 拉取到消息之后异步处理(创建线程池,对推送消息的部分利用多线程处理)

常见配置

fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟

fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的

fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms

max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500

consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回

max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has alreadyrebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or byreducing the maximum size of batches returned in poll() with max.poll.records. 

  即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题。

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作。

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

poll机制

  •    每次poll的消息处理完成之后再进行下一次poll,是同步操作
  •    每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移
  •    每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息
  •    poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒

http://www.ppmy.cn/news/1031466.html

相关文章

基于SpringBoot实现MySQL备份与还原

基于SpringBoot实现MySQL备份与还原&#xff0c;需求是在页面上对所有的平台数据执行备份和恢复操作&#xff0c;那么就需要使用代码去调用MySQL备份和恢复的指令&#xff0c;下面是具体实现步骤&#xff1b; MySQL备份表设计 CREATE TABLE IF NOT EXISTS mysql_backups (id …

adb对安卓app进行抓包(ip连接设备)

adb对安卓app进行抓包&#xff08;ip连接设备&#xff09; 一&#xff0c;首先将安卓设备的开发者模式打开&#xff0c;提示允许adb调试 二&#xff0c;自己的笔记本要和安卓设备在同一个网段下&#xff08;同连一个WiFi就可以了&#xff09; 三&#xff0c;在笔记本上根据i…

21款美规奔驰GLS450更换中规高配主机,汉化操作更简单

很多平行进口的奔驰GLS都有这么一个问题&#xff0c;原车的地图在国内定位不了&#xff0c;语音交互功能也识别不了中文&#xff0c;原厂记录仪也减少了&#xff0c;使用起来也是很不方便的。 可以实现以下功能&#xff1a; ①中国地图 ②语音小助手&#xff08;你好&#xf…

【Terraform学习】本地变量(Terraform配置语言学习)

背景&#xff1a; 关于如何在机器上拉terraform代码&#xff0c;初始化就不重复了&#xff0c;需要的可以查看前面的文章&#xff1a; 【Terraform学习】Terraform-AWS部署快速入门&#xff08;快速入门&#xff09;_向往风的男子的博客-CSDN博客 使用本地变量命名资源 将每…

《图解HTTP》——HTTP协议详解

目录 一、HTTP协议概述&#x1f3b9; 二、HTTP请求消息&#x1f966; 三、HTTP报文&#x1f420; 四、HTTP 协议瓶颈&#x1f512; 五、HTTP协议相关技术补充&#x1f381; 六、利用telnet观察http协议通讯过程&#x1f4a1; 一、HTTP协议概述&#x1f3b9; HTTP是一个属…

SpringBean的生命周期和循环依赖

Spring循环依赖 前言 大制作来啦&#xff0c;spring源码篇&#xff0c;很早之前我就想写一系列spring源码篇了&#xff0c;正好最近总是下雨&#xff0c;不想出门&#xff0c;那就让我来带大家走进Spring源码世界吧。 阅读建议 spring源码读起来有点难度&#xff0c;需要多Deb…

详细安装配置django

安装配置使用Django。 1&#xff0c;下载安装 django pip install django 2.创建设置项目 先进入要放置项目的文件夹下 2.1&#xff0c; 创建项目 django-admin startproject Api_project 2.2&#xff0c; 创建app命令 cd Api_project dir看一下是否有 manage.py 文件…

【JAVA开发工具系列】Git

Git常用功能整理 1.自动打包1.1 第一步安装git 服务1.1.1 查看版本1.1.2 安装1.1.3 配置秘钥 1.2 第二步 配置maven1.2.1 下载1.2.2解压1.2.3 配置环境变量1.2.4刷新环境变量文件1.2.5测试环境1.2.6 修改数据源 1.3 部署项目1.3.1拉取项目 1.4 jar 重启tomcat 2.SmartGit合并主…