04 kafka 中一些常用的配置的使用

news/2024/11/17 18:35:19/

前言

 

呵呵 也是最近有一些 搭建 kafka 的环境的需求 

然后 从新看了一下 一部分的配置情况, 这里 大致理一下 一些我这里比较关心的配置  

那些配置关联了 kafka 服务器绑定服务

绑定 tcp 服务的配置来自于这里, 读取的是 config.dataPlaneListeners 

config.dataPlaneListeners  是取自 listeners, 然后过滤掉了 controller 相关的配置 

listeners 的配置来自于 "listeners" / "host.name + port" 的配置 

那些配置关联了 kafka 服务器暴露给客户端的地址信息

和如下问题关联, 暴露给客户端的 地址信息 只要是来自于 advertisedListeners 相关

当前机器的虚拟机[NAT网络]映射出 9200 到宿主机, 局域网的其他机器访问 宿主机+9200 访问不到 kafka 的服务

引用自上面的链接 

FindCoordinatorRequest 获取的数据来自于 metadataSnapshot
kafkaController.sendUpdateMetadataRequest 发生事件的时候 更新 metadataSnapshot 的请求
aliveNodes 来自于 broker 的相关信息, 这个 brokerInfo 来自于 KafkaController.initializeControllerContext
KafkaController.initializeControllerContext 的 aliveNodes 来自于 zkClient 向 "/brokers/ids/*" 获取 brokerId 列表, 然后在依次获取 "brokers/ids/$id" 的数据信息, 作为 Broker 的元数据信息
broker 注册到 zk 的信息来自于 KafkaServer 启动的时候向 zk 注册的, endpoints 的相关信息来自于 "advertised.listeners", "advertised.host.name:advertised.host.port", "listeners", "host.name:port"

具体的获取 advertisedListeners 相关的处理如下 

kafka 启动之后还会连接 broker ? 

可以发现一个问题, 就是 假设 我 advertised.listeners 配置了一个存在问题的配置, ip 不存在, 或者 服务不存在 

服务器启动 大致会报错如下 

java.net.SocketTimeoutException: Failed to connect within 30000 msat kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:297)at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:250)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

这个错误是什么呢?, 影响到的又有哪些呢? 

这个请求的处理, 主要是来自于 controller 

kafkaServer - kafkaController - controllerChannelManager - requestSendThread, 再到具体的 requestSendThread 就是最原始导致上面报错的地方了,

在各种事件触发的时候, controller 需要连接 broker, 发送请求[由 requestSendThread 来发送请求], 比如 RegisterBrokerAndReelect 的时候, 需要 选举 controller, 之后需要发送 UpdateMetadataRequest 

然后 客户端尝试连接 kafka 服务, 会得到报错大致如下 

[10:44:05.239] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 29 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.365] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 30 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.484] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 31 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.604] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 32 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.721] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 33 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.840] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 34 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:05.953] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 35 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:06.065] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 36 : {test20220528=LEADER_NOT_AVAILABLE}
[10:44:06.187] WARN  org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater 1023 handleCompletedMetadataResponse - [Consumer clientId=consumer-1, groupId=test3] Error while fetching metadata with correlation id 37 : {test20220528=LEADER_NOT_AVAILABLE}

kakfa 事件相关处理 

kafkaServer - kafkaController - eventManager - controllerEventThread

controllerEventThread 中的事件处理, 主要又是委托给了 processor, 也就是 kafkaController 

这个 controller 的核心业务就是这各种事件的处理  

事件主要是来自于 kafka 本身, 以及监听 zk 的相关状态变化 产生 

controller 是如何选举的?

基于 zk 来选举 controller 

如果 创建节点 "/controller"  成功, 成功的 broker 即为 controller, 其他的节点不是 

broker 来尝试在 zk 上面创建 "/controller" 节点 

完 


http://www.ppmy.cn/news/8403.html

相关文章

实战四十六:基于LightGBM的广告点击预测 代码+数据

配库: 1. 读取原始数据, 将时间信息分解为天和分钟 2. 特征工程 3. 五折交叉验证训练模型 4. 特征重要性 5. 做出最终预测

使用异步ORM SQLAlchemy提升web服务性能

介绍 对于一个web服务,性能的瓶颈最终基本上都会出现在数据库读取的这一步上,如果能够在数据库读取数据的这一段时间自动切换去处理其他请求的话,服务的性能会得到非常显著的提升,因此需要选择一个合适的异步驱动和工具包 SQLAl…

诞生两年+,三翼鸟的“场景”思维有进化吗?

出品 | 何玺 排版 | 叶媛 2020年9月,三翼鸟品牌正式发布。截止2022年12月,三翼鸟已经走过了2年多的历程。诞生两年,三翼鸟有什么样的发展,它倡导的“场景”思维有进化吗?我们一起来看看。 01 从三翼鸟的“全球首个场…

java接口的静态方法

目前java接口中已经支持定义静态方法 但需要注意一个点 我们先把代码写出来 我们创建一个包 下面创建一个接口 subInterface 接口参考代码如下 public interface subInterface {static void show2() {System.out.println("来自接口的静态方法");} }这里 我们就将…

串口通信协议

同步通信和异步通信 同步通信:需要时钟信号的约束,在时钟信号的驱动下两方进行数据交换,一般会选择在上升沿或者下降沿进行数据的采样,以及时钟极性和时钟相位【eg.SPI,IIC】。 异步通信:不需要时钟信号的同步,通过(…

Go 并发

来自 《Go 语言从入门到实战》 的并发章节学习笔记,欢迎阅读斧正,感觉该专栏整体来说对有些后端编程经验的来说比无后端编程经验的人更友好。。 Thread VS Groutine 创建时默认 Stack 大小:前者默认 1M,Groutint 的 Stack 初始化…

uniCloud云开发----4、uniCloud云开发进阶使用方法

uniCloud云开发进阶使用方法前言1、云对象的importObject的创建和使用(1)创建云对象(2)编辑云对象(3)在.vue文件中调用云对象(4)在.vue文件中调用方法2、客户端直接连接数据库(1)直接在客户端引…

Threejs实现鼠标点击人物行走/镜头跟随人物移动/鼠标点击动画/游戏第三人称/行走动作

1,功能介绍 Threejs获取鼠标点击位置、实现鼠标点击人物行走、人物头顶显示名称标签、镜头跟随人物移动并且镜头围绕人物旋转,类似游戏中第三人称、鼠标点击位置有动画效果,如下效果图 2,功能实现 获取鼠标点击位置,…