kafka消息丢失?可能和seekToEnd有关

news/2024/10/18 14:22:13/

        最近遇到kafka消息丢失的偶现问题,排查许久都没找到原因。后面通读代码,才发现消息丢失和seekToEnd有关。

        我有一套环境是HA架构,3个节点,每个节点有多个app,每个app启动时会向zk注册,然后利用zk选出主app,zk选出主之后,被选为主的app则有资格作为kafka消息的接收者,根据收到的kafka消息进行相应业务的处理。

        偶现问题就是当某个app被zk选为主之后,平台会向其发送“你是主”的消息,但该app却没收到“你是主”的消息。

        虽然代码中使用了seekToEnd方法,该方法的意思就是读取最近的一个消息,但问题不是这个方法导致的。而是,注册kafka消费者的时机不对导致的。

        以下是导致问题发生的伪代码:

    public static void main(String[] args) {try {initLogger();// 连接zkconnect2Zk();// 连接kafkaconnect2Kafka();} catch (Exception e) {e.printStackTrace();log.error("failed to start server", e);}}

顺便看下使用seekToEnd的代码是怎么写的:

public class MyAppConsumer implements Runnable {public MyAppConsumer (String gid, List<String> topics) {init(gid, topics);}@Overridepublic void run() {try {consumer = new KafkaConsumer<>(props);consumer.subscribe(this.topics);consumer.seekToEnd(new ArrayList<>());log.info("Start Consumer: {} {}", gid, topics);} catch (Exception e) {}while (isRunning) {try {ConsumerRecords<String, String> records = consumer.poll(100);handleRecords(records);.......} catch (Exception e) {}}}    
}

由于这个是偶现问题,所以复现不容易。可以通过增加日志打印,在发送“你是主”的消息和app连接kafka成功,变成kafka消费者的地方增加详细的日志打印,以此来确认问题。

        这里我们就靠口述问题发生的场景了:当连接kafka的方法(connect2Kafka)在连接zk(connect2Zk)之后,如果zk选主完成,kafka的连接还未成功,则会导致问题发生。因为zk选主完成之后,平台就会向对应的app发送“你是主”的消息,而此时该app还未连接到kafka,还不是kafka的消费者,当连接kafka成功之后,因为使用了seekToEnd方法,因此该app只会读取最新的消息,之前的都丢弃了,那么就永远也收不到“你是主”的消息了。

        既然发生问题的原因找到了,那改起来也就很方便了,将连接kafka的方法(connect2Kafka)放在连接zk(connect2Zk)之前就可以了。伪代码如下:

    public static void main(String[] args) {try {initLogger();// 连接kafkaconnect2Kafka();// 连接zkconnect2Zk();} catch (Exception e) {e.printStackTrace();log.error("failed to start server", e);}}

        回头想想,一般我们遇到的偶现问题,就会觉得很头疼,但当哪天心情好的时候,去慢慢梳理一下代码,也许你就会发现,好家伙,自己给自己挖了一个大坑!!!


http://www.ppmy.cn/news/1540021.html

相关文章

操作系统简介:作业管理

作业管理 一、作业管理1.1 作业控制1.2 作业的状态及其转换1.3 作业控制块和作业后备队列 二、作业调度2.1 调度算法的选择2.2 作业调度算法2.3 作业调度算法性能的衡量指标 三、人机界面 作业&#xff1a;系统为完成一个用户的计算任务&#xff08;或一次事务处理&#xff09;…

Python打包之嵌入式打包神器PyStand

在使用Python开发项目时&#xff0c;如果项目依赖了如torch这样的大型第三方库&#xff0c;打包后的体积可能会变得非常庞大&#xff08;超过1GB&#xff09;。传统的打包工具&#xff0c;如Nuitka或PyInstaller&#xff0c;可能会面临打包成功率低、耗时长、打包后体积巨大的问…

4.stable-diffusion-webui1.10.0--图像修复(adetailer)插件

ADetailer是Stable Diffusion WebUI的一个插件&#xff0c;它通过深度学习模型智能检测图像中的人脸、手部及身体等关键部位&#xff0c;并自动进行重绘修复&#xff0c;使得生成的图像更加自然、符合预期。 ADetailer插件主要应用于图像的细节增强、降噪和修复&#xff0c;特…

hive自定义函数缺包报错,以及运行时与hive冲突解决

一.问题描述 仅描述了从配置到打包上传的过程&#xff0c;想要看解决请直接跳到下文的对应模块。 在使用hive设置自定义函数的时候在pom.xml中配置如下依赖&#xff0c;使其打包的时候带依赖打包&#xff1a; <dependencies><dependency><groupId>org.apa…

递归——二叉树中的深搜

文章目录 计算布尔二叉树的值求根节点到叶节点数字之和二叉树剪枝验证二叉搜索树二叉搜索树中第 K 小的元素二叉树的所有路径 二叉树中的深搜有三种方法 前序遍历 根->左子树->右子树 中序遍历 左子树->根->右子树 前序遍历 左子树->右子树->根 计算布尔…

【人工智能】大模型的崛起为AI Agent注入了“聪明的大脑”,彻底改变了定义!

在人工智能的迅猛发展中&#xff0c;大模型的崛起为AI Agent注入了“聪明的大脑”&#xff0c;彻底改变了其定义。如今&#xff0c;基于大模型的AI Agent架构已成为企业应用大模型的首选方案。本文将深入探讨AI Agent的构建、框架选择及其在实际应用中的重要性&#xff0c;帮助…

PCL 点云配准-4PCS算法(粗配准)

目录 一、概述 1.1原理 1.2实现步骤 1.3应用场景 二、代码实现 2.1关键函数 2.1.1 加载点云数据 2.1.2 执行4PCS粗配准 2.1.3 可视化源点云、目标点云和配准结果 2.2完整代码 三、实现效果 3.1原始点云 3.2配准后点云 PCL点云算法汇总及实战案例汇总的目录地址链接…

前端开发学习(一)VUE框架概述

一、MVC模式与MVVM模式 1.1mvc模式 MVC模式是移动端应用广泛的软件架构之一&#xff0c;MVC模式将应用程序划分为3部分:Model(数据模型)、View(用户界面视图)和Controller(控制器)。MVC模式的执行过程是将View层展示给用户&#xff0c;也就是通过 HTML页面接受用户动作&#…