《深入理解kafka-核心设计与实践原理》第三章:消费者

server/2024/9/19 10:02:57/ 标签: kafka, linq, 数据库

第三章:消费者

3.1 消费者与消费组

3.1.1 消费者(Consumer)

3.1.2 消费组(Consumer Group)

3.1.3 消息投递模式

3.2 客户端开发

3.2.1 必要的配置参数

3.2.2 订阅主题与分区

3.2.3 反序列化

3.2.4 消费消息

3.2.5 位移提交

3.2.5.1 offset

3.2.5.2 消费后的提交方式:自动提交/手动提交

3.2.6 控制/关闭消费

3.2.7 指定位移消费

3.2.8 再均衡(Rebalance)

3.2.9 消费者拦截器

3.2.10 多线程实现

3.3 重要的消费者参数

第三章:消费者

3.1 消费者与消费组

  • 概念上的区分
    • 消费者:概念上是实际的应用消费实例,它可以是一个钱程,也可以是一个进程
    • 消费组:逻辑上的概念

3.1.1 消费者(Consumer)

负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息

3.1.2 消费组(Consumer Group)

  • 与其他消息中间件不同的是在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组
    • 当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者,即每一个分区只能被一个消费组中的一个消费者所消费
    • 消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,可以通过增加consumer个数来提高消费能力(图3-2至图3-4)。但一昧增加消费者并不会让消费能力一直得到提升,最后瓶颈到了partition的个数(图3-5)
    • 增加消费者会让消费能力提升,但如果出现消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区(图3-5)

  • 可以通过消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略

3.1.3 消息投递模式

  • 点对点模式(P2P):点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息
  • 发布/订阅模式(Pub/Sub):生产者发送消息到主题,订阅者从主题订阅消息来消费。该模式可在一对多广播时采用
  • Kafka对于两种模式都支持
    • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用
    • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用

3.2 客户端开发

一个正常的消费逻辑具备了以下步骤,本章从以下五个步骤进行讲解

  • ①【配置消费者客户端参数】及【创建相应的消费者实例】
  • ②订阅主题
  • ③拉取消息并消费
  • ④提交消费位移
  • ⑤关闭消费者实例

3.2.1 必要的配置参数

  • bootstrap.servers:同2.1.1中生产者的参数
  • group.id:消费者隶属的消费组名称,默认值为""
  • key.deserializer和value.deserializer:消费者反序列化器,需与生产者序列化器配套

3.2.2 订阅主题与分区

  • ①subscribe()方法:一个消费者KafkaConsumer可以订阅一个或多个主题
    • public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)	//设置rebalance监昕器,3.2.8会讲到
      public void subscribe(Collection<String> topics)
      public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
      public void subscribe(Pattern pattern)
  • ②assign()方法:除了通过subscribe()方法来订阅topic,还可以通过assign()方法直接订阅特定的分区
    • consumer.assign(Arrays.asList(new TopicPartition (”topic-demo”, 0))) ; //该方法只订阅topic-demo主题中分区编号为0的分区
  • ③unsubscribe()方法:取消订阅
  • ④subscribe()与assign()的区别
    • subscribe()具有消费者自动再均衡(rebalance)的功能,即在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内 的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移
    • assign()不具备自动再均衡(rebalance)的功能,它是指定分区号的

3.2.3 反序列化

  • 同发送端的序列化,实现Deserializer接口。这里不再赘述

3.2.4 消费消息

  • 消息的消费一般有两种模式 : 推模式和拉模式。Kafka中的消费是基于拉模式的
    • 推模式(push):服务端主动将消息推送给消费者
    • 拉模式(pull):消费者主动向服务端发起请求来拉取消息
  • Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而 poll()方法返回的是所订阅的主题(分区)上的一组消息
    • //对于 poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空; 如果订阅的所有分区中都没有可供消费的消息,那么 poll()方法返回为空的消息集
      //ConsumerRecords表示一次拉取操作所获得的消息集
      public ConsumerRecords<K, V> poll(final Duration timeout)
  • 消费者消费到的每条消息的类型为ConsumerRecord,这个和生产者发送的类型ProducerRecord相对应,但ConsumerRecord中内容更丰富
  • 到目前为止,可以简单地认为poll()方法只是拉取一下消息而己,但就其内部逻辑而言并不简单:它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容

3.2.5 位移提交

3.2.5.1 offset
  • 分区中的offset:表示消息在分区中对应的位置(本书中称偏移量),分区中每条消息都有唯一的offset
  • 消费者的offset:表示消费者消费到的位置(本书中称位移),也称"消费位移"。消费者在消费完消息后需要执行消费位移的提交
    • 在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题 __consumer_offsets 中

几个概念

  • lastConsumedOffset:消费者消费到此分区消息的最大偏移量,如377(说明377已经被消费,378还未被消费)
  • committed offset:消费者消费完后,提交的位移值,会多提交一位,如378
  • position:消费者下一次要拉取的消息的起始偏移量,如378
    • 一般而言:position = committed offset = lastConsumedOffset + 1(当然position和committed offset并不会一直相同)
3.2.5.2 消费后的提交方式:自动提交/手动提交

①自动提交

  • 默认自动提交,这个参数由enable.auto.commit控制,默认为true
  • 注意:这里的自动提交不是指每消费一条消息就提交一次,而是定期提交。这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。即消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交
  • 自动位移提交的动作是在 poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移

自动提交方式带来的问题:不建议设置成自动提交

  • 重复消费:假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象
    • 优化:减小位移提交的时间间隔(auto.commit.interval.ms)来减小重复消息窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁
  • 消息丢失:线程A拉取消息,丢给自定义的异步业务线程B处理消息。A已经commit了,B在处理的过程中宕机,造成消息丢失(A拉取了消息,还没处理完,消息被自动commit了,然A宕机了业务未完成)
  • 无法做到精确的位移管理

②手动提交:如果遇到复杂的业务场景,很多时候并不是说拉取到消息就算消费完成,需要一系列复杂业务操作(如消息写入数据库、写入本地缓存),这时候可以改成手动提交

  • enable.auto.commit参数改成false
  • 手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer中的commitSync()和commitAsync()两种类型的方法
    • 同步提交:拉取的消息全部消费完了,再重新拉取,每次拉取批次默认最大200。如果消费崩溃了,第二次会重新拉取,造成重复消费
      • 在实际应用中,很少会有这种每消费一条消息就提交一次消费位移的必要场景。 commitSync()方法本身是同步执行的,会耗费一定的性能
      • commitSync()方法会根据 poll()方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成
    • 异步提交:在执行的时候消费者线程不会被阻塞 , 可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作,异步提交可以便消费者的性能得到一定的增强
      • public void commitAsync {) 
        //当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete()方法
        public void commitAsync{OffsetCommitCallback callback ) 
        public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets , OffsetCommitCallback callback)

3.2.6 控制/关闭消费

  • 在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费
  • KafkaConsumer中提供 pause()和 resume()方法来分别实现
    • public void pause(Collection<TopicPartition> partitions)
      public void resume(Collection<TopicPartition> partitions)

3.2.7 指定位移消费

  • 在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费
    • latest:默认参数,表示从分区末尾开始消费消息
    • earliest:那么消费者会从起始处,也就是 0 开始消费
    • none:出现查不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常
  • 如何追前或回溯消息:seek()方法,可以从特定的位移处开始拉取消息
    • //partition表示分区,而offset参数用来指定从分区的哪个位置开始消费
      public void seek(TopicPartition partition, long offset)
    • seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。 也就是说,在执行seek()方法之前需要先执行一次 poll()方法,等到分配到分区之后才可以重置消费位置

3.2.8 再均衡(Rebalance)

  • 再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者
  • 通俗的讲:Rebalance作用在消费组层面(新增/下线消费者),Rebalance的目的是确保分区在consumer group成员之间平均分配,以便每个consumer都有分区去消费;通常是Kafka协调器自动触发的
  • 再均衡的几个问题
    • 期间不可用:在再均衡发生期间的这一小段时间内,消费组会变得不可用
    • 重复消费:当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。可能会发生重复消费
    • 一般情况下,应尽量避免不必要的再均衡的发生
  • 再均衡监听器ConsumerRebalanceListener在subscribe()方法中会被用到,见3.2.2
  • 相关api:ConsumerRebalanceListener接口,包含两个方法
    • void onPartitionsRevoked(Collection<TopicPartition> partitions):这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交, 以此来避免一些不必要的重复消费现象的发生。参数partitions表示再均衡前所分配到的分区
    • void onPartitionsAssigned(Collection<TopicPartition> partitions):这个方法会在重新分配分区之后和消费者开始读取消费之前被调用 。参数 partitions 表示再均衡后所分配到的分区

3.2.9 消费者拦截器

与生产者拦截器对应,通过实现org.apache.kafka.clients.consumer.Consumerlnterceptor接口

//KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); 
//KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();

3.2.10 多线程实现

KatkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的

  • KafkaConsumer 中的每个公用方法在执行所要执行的动作之前都会调用内部的acquire()方法和release()方法。我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作

KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行,仍可以用多线程的方式运行,提高消费能力。实现方式有多种

  • 常见方式一:线程封闭,即为一个线程实例化一个KafkaConsumer对象
  • 常见方式二:多个消费线程同时消费同一个分区,不过这种方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,不推荐
  • 常见方式三:线程池处理:相比第一种,除了横向扩展的能力,还可以减少 TCP 连接对 系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难
    • 需注意的是,第三种方式需要考虑位移提交

3.3 重要的消费者参数

  • fetch.min.bytes,fetch.max.bytes,fetch.max.wait.ms
    • fetch.min.bytes:Consumer 在一次拉取请求(调用 poll()方法)中能从 Kafka 中拉取的最小数据量,默认值为1(B),如果当前数据量小于这个值,会进行等待,直到数据量满足这个参数的配置大小。这个值太大可能会造成额外的延迟
    • fetch.max.bytes:Consumer在一次拉取请求中从 Kafka 中拉取的最大数据量,默认值为50MB。如果一条消息比这个值大,会造成无法拉取和消费
    • fetch.max.wait.ms:默认值500ms,fetch.min.bytes会引起消费者等待,如果超过等待时间,则会拉取
  • max.partition.fetch.bytes:从每个分区里返回给 Consum町的最大数据量默认1MB
  • max.poll.records:消费者在一次拉取请求中拉取的最大消息数,默认值为500(条)

http://www.ppmy.cn/server/36669.html

相关文章

《Redis使用手册之集合》

《Redis使用手册之集合》 目录 **《Redis使用手册之集合》****SADD&#xff1a;将元素添加到集合****SREM&#xff1a;从集合中移除元素****SMOVE&#xff1a;将元素从一个集合移动到另一个集合****SMEMBERS&#xff1a;获取集合包含的所有元素****SCARD&#xff1a;获取集合包…

yolov5网络结构图要点和难点实际案例和代码解析

YOLOv5网络结构图主要可以分为四个部分:输入端(Input)、Backbone(主干网络)、Neck(颈部)和Prediction(输出端)。以下是对这四个部分的简要说明: 输入端(Input): 数据增强:YOLOv5在输入端使用了Mosaic数据增强技术,这是一种将四张训练图像混合成一张的方式,可以…

Python 贪吃蛇

文章目录 效果图&#xff1a;项目目录结构main.pygame/apple.pygame/base.pygame/snake.pyconstant.py 效果图&#xff1a; 项目目录结构 main.py from snake.game.apple import Apple # 导入苹果类 from snake.game.base import * # 导入游戏基类 from snake.game.snake im…

Linux MQTT智能家居(Linux下运行MQTT)

文章目录 前言一、下载源码编译1.编译出64位的库文件2.编译出ARM平台下的库文件 二、将lib库文件和include文件加入自己的工程1.ubuntu下测试2.ARM平台测试 总结 前言 本篇文章将带大家在Linux下运行MQTT库&#xff0c;我们首先会将MQTT库下载下来&#xff0c;然后进行编译&am…

如何用Kimi,5秒1步生成流程图

引言 在当前快节奏的工作环境中&#xff0c;拥有快速、专业且高效的工具不可或缺。 Kimi不仅能在5秒内生成专业的流程图&#xff08;kimi&#xff09;&#xff0c;还允许实时编辑和预览&#xff0c;大幅简化了传统流程图的制作过程。 这种迅速的生成能力和高度的可定制性使得…

Python:实现b站登录并保存登录信息(baidu Comate插件帮助我逐行分析代码)

&#x1f4da;博客主页&#xff1a;knighthood2001 ✨公众号&#xff1a;认知up吧 &#xff08;目前正在带领大家一起提升认知&#xff0c;感兴趣可以来围观一下&#xff09; &#x1f383;知识星球&#xff1a;【认知up吧|成长|副业】介绍 ❤️感谢大家点赞&#x1f44d;&…

PWN入门--栈溢出

PWN入门–栈溢出 栈概要 介于网上各种wp在栈溢出利用脚本方面浅入浅出&#xff0c;深入讲一下栈溢出利用时&#xff0c;地址如何计算&#xff0c;栈中垃圾数据如何填充&#xff0c;函数调用时 参数 在栈中的分布&#xff0c; 栈帧的生成&#xff0c;函数返回值ip在栈中的摆放…

什么是SSL?SSL安全证书一定要有吗?

什么是SSL证书&#xff1f; SSL证书是数字证书的一种&#xff0c;类似于驾驶证、护照和营业执照的电子副本。因为配置在服务器上&#xff0c;也称为SSL服务器证书。SSL 证书就是遵守 SSL协议&#xff0c;由受信任的数字证书颁发机构CA&#xff0c;在验证服务器身份后颁发&…

Internet接入技术

internet接入技术 发展 1960s-1980s 早期互联网雏形ARPANET发展&#xff0c;主要限于研究机构和大学之间&#xff0c;通过专用线路连接。 1990s初期 拨号接入开始普及&#xff0c;用户通过电话线和调制解调器以低速&#xff08;起初为14.4Kbps&#xff0c;后来提升到56Kbps&a…

java一个接口完成三种文件类型上传(exe,文档,图片)

try {// 获取文件后缀名String originalFilename file.getOriginalFilename();String fileExtension originalFilename.substring(originalFilename.lastIndexOf(".")).toLowerCase();// 判断文件类型并设置对应的字段和保存路径String filePath;String fieldType;…

Xshell不能使用(版权原因不能使用),通过ip连接虚拟机CentOS7系统拷贝文件

一、使用SSH服务 1.确保 CentOS 7 虚拟机安装了 SSH 服务。 systemctl status sshd 如果没有安装&#xff0c;您可以使用以下命令来安装&#xff1a; sudo yum install openssh-server 2.启动 SSH 服务&#xff08;如果尚未启动&#xff09;&#xff1a; sudo systemctl …

[Rust] 打印变量类型

一、简介 本文介绍了如何在Rust中打印变量的类型。 二、代码 代码&#xff1a; // 使用 print_type_of 函数打印变量类型 fn print_type_of<T>(_: &T) {println!("{}", std::any::type_name::<T>()) }fn main() {let s "Hello";let i…

【一看就懂】UART、IIC、SPI、CAN四种通讯协议对比介绍

UART、IIC、SPI、CAN四种通信协议对比 通信方式传输线通讯方式标准传输速度使用场景UARTTX(发送数据线)、RX(接收数据线)串行、异步、全双工115.2 kbit/s(常用)计算机和外部设备通信&#xff08;打印机&#xff09;IICSCL(时钟线)、SDA(数据线)串行、同步、半双工100 kbit/s(标…

10页面结构分析

我们打开一个网页&#xff0c;都会有一个清晰的结构和布局上图中的标签就是用来划分各个部分区域用的。其中比较常用重要的是header、footer和nav&#xff0c;需要重点掌握。 下面是部分代码及效果演示 <header> <h2>网页头部</h2> </header><sec…

【AIGC调研系列】VILA-1.5版本的视频理解功能如何

VILA-1.5版本的视频理解功能表现出色&#xff0c;具有显著的突破。这一版本不仅增强了视频理解能力&#xff0c;还提供了四种不同规模的模型供用户选择&#xff0c;以适应不同的应用需求和计算资源限制[1][2][3]。此外&#xff0c;VILA-1.5支持在笔记本等边缘设备上部署&#x…

MyBatis-Plus 数据库表字段名为关键字

数据库表字段名为关键字时&#xff0c;执行SQL是会报错的&#xff0c;一般的处理是在字段左右加解决&#xff0c;MyBatis-Plus也支持这种处理方式。 找到数据库表对应的实体类。TableField注解中的值左右添加即可。Schema(description "说明") TableField("de…

JavaEE初阶-多线程易忘点总结

文章目录 1.PCBPID文件描述符表内存指针状态上下文优先级记账信息tgid 2.线程与进程的区别3.sleep和interrupt方法的关系变量终止线程interrupt方法终止线程 4.线程状态5.出现线程不安全的原因线程在系统中是随即调度&#xff0c;抢占式执行的。多个线程修改同一个变量线程针对…

从互联网医院源码到搭建:开发视频问诊小程序的技术解析

如今&#xff0c;视频问诊小程序作为医疗服务的一种新形式&#xff0c;正逐渐受到人们的关注和青睐。今天&#xff0c;小编将为您详解视频问诊小程序的开发流程。 一、背景介绍 互联网医院源码是视频问诊小程序开发的基础&#xff0c;它提供了一套完整的医疗服务系统框架&…

Rust:用 Warp 库实现 Restful API 的简单示例

直接上代码&#xff1a; 1、源文件 Cargo.toml [package] name "xcalc" version "0.1.0" edition "2021"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies] warp "…

(40)4.30数据结构(队列)

1.队列的基本概念 2.队列的顺序 #define MaxSize 10 #define ElemType int typedef struct { ElemType data[MaxSize]; int front, rear; }SqQueue;//1.初始化操作 void InitQueue(SqQueue& Q) { //初始化 队头&#xff0c;队尾指针指向0 Q.rear Q.fron…