记一次kafka消息丢失问题排查

devtools/2024/9/23 10:52:51/

背景

我写了一个 自定义分区器 ,自测发送了一些简单的如Hello world 之类的消息成功了,并且日志现实确实调用了我自己的分区器,然后我自认为已经完美了。

后来我发现很多消息消费者没有消费,  且发送完成回调(CallBack)没有被调用。 消息就这么石沉大海,也没有错误日志。

排查过程

一开始我从消息丢失方向排查,我前面文章也提到 消息发送几种方式 ,其中异步发送确实可能丢失,但我发送代码如下,我处理了异常,如果发送失败应该有异常打印出来的。

我一时也陷入迷茫,于是我网上查阅资料,关于生产者几个重要配置参数

acks 分区中有多少副本收到消息,生产者才认为消息发送成功 ,默认值1 ,并且我本地使用单节点,因此排除该配置

有些网上说 batch.size 和 request.timeout.ms 不对,我半信半疑,改了参数试试,依旧有问题

producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace();} else {System.out.println(content + " send sucesss  " + recordMetadata.toString());}}
});

定位问题

肯定是我修改了什么导致的问题, 于是我先把所有配置修改为默认的,发现能正常发送消息,消息也被消费。最终发现一旦使用我自定义分区器,消息发送就有问题。代码如下,聪明的你看出问题在哪了吗?

public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//获取分区总个数int numPartitions = partitions.size();int partition = key != null ? key.hashCode() % numPartitions : value.hashCode() % numPartitions;return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

答案揭晓

问题在于 HashCode 返回的可能是负数 ,负数再对 分区数 取模,得到的分区序号 就为负数。分区序号一旦为负,分区序号有问题了当然消息就发不出去,只是没想到也没报错。定位问题修复也很简单,给HashCode 取绝对值就解决了。

 int partition = key != null ? Math.abs(key.hashCode()) % numPartitions : Math.abs(value.hashCode()) % numPartitions;

举一反三

如果返回的分区序号大于分区数 消息发送也同负数情况一样,石沉大海。

分区数配置为 10 ,分区器返回序号固定11 测试结果同预期一样,和负数情况一样,消息丢失。

    @Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 11;}

反思

  • 没意识到 默认HashCode 可能为负数(当时误任务默认的不会为负数,除非恶意重写了HashCod方法)

  • 没有参考默认的分区器 DefaultPartitioner使用了 Utils.toPositive 将HashCode 转成了正数

  • 排查过程也熟悉了不少 kafka重要参数
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

结语

限于篇幅一些代码没有给出,欢迎私下交流学习。


http://www.ppmy.cn/devtools/114634.html

相关文章

【蜡笔小新专享】安装虚拟机、PHP、DVWA

在 VMware 中安装 PHP 和 DVWA 需要几个步骤。这里将详细介绍如何在一个 Linux 虚拟机中安装 DVWA 和 PHP 环境&#xff0c;以便进行 Web 安全测试。假设你已经在 VMware 上安装好了一个 Linux 发行版&#xff08;如 Ubuntu&#xff09;。 步骤 1&#xff1a;安装 VMware 和创…

图像到图像的翻译

图像到图像的翻译&#xff08;Image-to-Image Translation&#xff09;是指将一种图像从一种表示转换为另一种表示的过程。该任务的目标是在保证图像语义信息的前提下&#xff0c;将图像风格、颜色或其他视觉特征进行转换。该技术在计算机视觉领域具有广泛应用&#xff0c;例如…

算法 | 基础排序算法:插入排序、选择排序、交换排序、归并排序

文章目录 排序算法一、排序概念及运用1. 概念2. 运用3. 常见排序算法 二、实现常见排序算法1. 插入排序1.1 基本思想1.2 直接插入排序1.3 希尔排序 2. 选择排序2.1 基本思想2.2 直接选择排序2.3 堆排序 3. 交换排序3.1 基本思想3.2 冒泡排序3.3 快速排序3.4 非递归版本 4. 归并…

智能BI项目第四期

开发图表管理功能 规划思路 首先需要做一个列表页。后端已经在星球提供了一个基础的万能项目模板&#xff0c;包含增删改查接口&#xff0c;我们只需要在此基础上进行定制化开发即可。所以本期后端的开发量不多&#xff0c;只需要复用即可&#xff0c;主要是前端。 规划功能…

Linux —— 多线程

一、本篇重点 1.了解线程概念&#xff0c;理解线程与进程区别与联系 2.理解和学会线程控制相关的接口和操作 3.了解线程分离与线程安全的概念 4.学会线程同步。 5.学会互斥量&#xff0c;条件变量&#xff0c;posix信号量&#xff0c;以及读写锁 6.理解基于读写锁的读者写…

Day15笔记-函数返回值函数的封装匿名函数

一、函数基础【重点掌握】 1.关键字参数和不定长参数 # 1.关键字参数 # 注意&#xff1a;默认参数主要体现在函数的定义中【声明】&#xff0c;关键字参数主要体现在函数的调用中 def func1(a,b,c):print(11111,a,b,c) # 必须参数的缺点&#xff1a;必须按照顺序传参&#xff…

关于 Goroutines 和并发控制的 Golang 难题

下面是一道关于 Goroutines 和并发控制的 Golang 难题&#xff0c;它涉及到 Go 的并发编程模型、Goroutines、通道&#xff08;Channels&#xff09;以及 sync.WaitGroup 的使用&#xff1a; 问题描述&#xff1a; 你有一个需要并发执行的任务&#xff0c;其中有 100 个 URL …

如何检测电脑有无恶意软件并处理掉?

检测并处理电脑上的恶意软件是维护计算机安全的重要步骤。以下是一些推荐的步骤&#xff1a; 检测恶意软件 使用杀毒软件&#xff1a; 安装一个可靠的杀毒软件&#xff0c;如360安全卫士、腾讯电脑管家、卡巴斯基、Bitdefender、Avast等。确保杀毒软件的病毒库是最新的。运行全…