Apache Kafka - 重识消费者

news/2024/11/25 15:40:36/

文章目录

  • 概述
    • Kafka消费者的工作原理
    • Kafka消费者的配置
    • Kafka消费者的实现
      • 高级API
      • 低级API
  • 导图
    • 总结

在这里插入图片描述


概述

Kafka是一个分布式的消息队列系统,它的出现解决了传统消息队列系统的吞吐量瓶颈问题。

Kafka的高吞吐量、低延迟和可扩展性使得它成为了很多公司的首选消息队列系统。

在Kafka中,消息被分成了不同的主题(Topic),每个主题又被分成了不同的分区(Partition)。

生产者(Producer)将消息发送到指定的主题中,而消费者(Consumer)则从指定的主题中读取消息。

接下来我们将介绍Kafka消费者相关的知识。

Kafka消费者的工作原理

Kafka消费者从指定的主题中读取消息,消费者组(Consumer Group)则是一组消费者的集合,它们共同消费一个或多个主题。在一个消费者组中,每个消费者都会独立地读取主题中的消息。当一个主题有多个分区时,每个消费者会读取其中的一个或多个分区。消费者组中的消费者可以动态地加入或退出,这样就可以实现消费者的动态扩展。

Kafka消费者通过轮询(Polling)方式从Kafka Broker中读取消息。当一个消费者从Broker中读取到一条消息后,它会将该消息的偏移量(Offset)保存在Zookeeper或Kafka内部主题中。消费者组中的消费者会协调并平衡分区的分配,保证每个消费者读取的分区数量尽可能均衡。

Kafka消费者的配置

  1. bootstrap.servers

    该参数用于指定Kafka集群中的broker地址,多个地址以逗号分隔。消费者会从这些broker中获取到集群的元数据信息,以便进行后续的操作。

  2. group.id

    该参数用于指定消费者所属的消费组,同一消费组内的消费者共同消费一个主题的消息。如果不指定该参数,则会自动生成一个随机的group.id。

  3. enable.auto.commit

    该参数用于指定是否启用自动提交offset。如果设置为true,则消费者会在消费消息后自动提交offset;如果设置为false,则需要手动提交offset。

  4. auto.commit.interval.ms

    该参数用于指定自动提交offset的时间间隔,单位为毫秒。只有当enable.auto.commit设置为true时,该参数才会生效。

  5. session.timeout.ms

    该参数用于指定消费者与broker之间的会话超时时间,单位为毫秒。如果消费者在该时间内没有发送心跳包,则会被认为已经失效,broker会将其从消费组中移除。

  6. max.poll.records

    该参数用于指定每次拉取消息的最大条数。如果一次拉取的消息数量超过了该参数指定的值,则消费者需要等待下一次拉取消息。

  7. auto.offset.reset

    该参数用于指定当消费者第一次加入消费组或者offset失效时,从哪个位置开始消费。可选值为latest和earliest,分别表示从最新的消息和最早的消息开始消费。

  8. max.poll.interval.ms

    该参数用于指定两次poll操作之间的最大时间间隔,单位为毫秒。如果消费者在该时间内没有进行poll操作,则被认为已经失效,broker会将其从消费组中移除。

  9. fetch.min.bytes

    该参数用于指定每次拉取消息的最小字节数。如果一次拉取的消息数量不足该参数指定的字节数,则消费者需要等待下一次拉取消息。

  10. fetch.max.wait.ms

    该参数用于指定拉取消息的最大等待时间,单位为毫秒。如果在该时间内没有获取到足够的消息,则返回已经获取到的消息。


Kafka消费者的实现

Kafka消费者的实现可以使用Kafka提供的高级API或者低级API。高级API封装了低级API,提供了更加简洁、易用的接口。下面分别介绍一下这两种API的使用方法。

高级API

使用高级API可以更加方便地实现Kafka消费者。下面是一个使用高级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。

低级API

使用低级API可以更加灵活地实现Kafka消费者。下面是一个使用低级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "fal	VCC se");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());consumer.commitSync(Collections.singletonMap(record.topicPartition(), new OffsetAndMetadata(record.offset() + 1)));}
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。在处理完每条消息后,我们使用commitSync方法手动提交偏移量。


导图

在这里插入图片描述

总结

Kafka消费者是Kafka消息队列系统中的重要组成部分,它能够从指定的主题中读取消息,并进行相应的处理。在使用Kafka消费者时,需要注意消费者组ID、自动提交偏移量、偏移量重置策略以及消息处理方式等配置信息。

在这里插入图片描述


http://www.ppmy.cn/news/108608.html

相关文章

求个大数据开发的学习路线

随着数据开发工程师成为炙手可热的职位&#xff0c;与之相关各项条件水涨船高&#xff1a;录取标准、人才需求、以及&#xff0c;薪资待遇&#xff0c;因此想要学习大数据掌握相关技能才是自身最大的核心竞争力。 大数据工程师的技术要求如下&#xff1a; 1、掌握至少一种数据…

uni-app生命周期有哪些?怎么理解?

uni-app生命周期有哪些&#xff1f;怎么理解&#xff1f; uni-app生命周期有哪些&#xff1f;怎么理解&#xff1f; 文章目录 uni-app生命周期有哪些&#xff1f;怎么理解&#xff1f;前言一、什么是生命周期函数&#xff1f;二、uni-app生命周期分类总结 前言 UNI-APP学习系…

测牛学堂:2023最新自动化软件测试教程之python基础(字符串常用api总结)

python字符串常用API总结 1 count 查找某个字符在整个字符串中出现的次数 2 capitalize 将字符串的第一个字符转换为大写 3 center(width,fillchar) 返回一个指定宽度的字符串&#xff0c;fillchar为填充的字符&#xff0c;默认是空格&#xff0c;常用* str1 分隔线 print(st…

SeaFormer实战:使用SeaFormer实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整算法设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试热力图可视化展示…

Liskov替换原则:用了继承,子类就设计对了吗?

前言 上一篇&#xff0c;我们讲了开放封闭原则&#xff0c;想要让系统符合开放封闭原则&#xff0c;最重要的就是我们要构建起相应的扩展模型&#xff0c;所以&#xff0c;我们要面向接口编程。 而大部分的面向接口编程要依赖于继承实现&#xff0c;继承的重要性不如封装和多…

tensorrt yolov7 推理

参考 源码修改如下&#xff1a;如果将源代码cpp/norm/yolo.hpp修改为自己训练的数据时修改如下&#xff1a; class YOLO{ const char* INPUT_BLOB_NAME "images"; const char* OUTPUT_BLOB_NAME "output"; }根据自己转换onnx模型采用netron打开查看 输入…

【Java学习记录-5】接口

接口 说明&#xff1a;接口就是一种公共的规范标准&#xff0c;只要符合规范标准&#xff0c;大家都可以通用。Java中的接口更多的体现在对行为的抽象 特点 接口用关键字interface修饰&#xff1a;public interface 接口名{}类实现接口用implements表示&#xff1a;public c…

jmeter 性能测试工具的使用(Web性能测试)

1、下载 2023Jmeter性能测试项目实战教程&#xff0c;十年测试大佬手把手教你做性能&#xff01;_哔哩哔哩_bilibili2023Jmeter性能测试项目实战教程&#xff0c;十年测试大佬手把手教你做性能&#xff01;共计11条视频&#xff0c;包括&#xff1a;1.什么是性能测试以及性能测…