kafka消费数据太慢了,给优化下

devtools/2024/11/16 10:43:17/

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}

http://www.ppmy.cn/devtools/134407.html

相关文章

Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks

摘要 大型预训练语言模型已经被证明可以在其参数中存储事实性知识,并在下游自然语言处理(NLP)任务中通过微调取得了最先进的效果。然而,它们访问和精准操控知识的能力仍然有限,因此在知识密集型任务上,它们的表现往往落后于专门为任务设计的架构。此外,这些模型在决策时…

SpringBoot(二十二)SpringBoot集成smart-doc自动生成文档

计划在项目中集成swagger日志框架,经过一顿折腾,发现一个小小的问题。我得springboot框架版本是2.6.13,swagger要匹配这个版本的springboot可能比较麻烦。 这可如何是好…… 经跟社区的大神得讨论,他在用的文档生成工具是smart-doc。我尝试一下集成这个工具。 官网:https:…

梧桐数据库中的循环函数统计提升数据库性能的详细指南

梧桐数据库中的循环函数统计提升数据库性能的详细指南 引言 在现代企业级应用中&#xff0c;数据库性能的优劣直接影响着用户体验和业务效率。梧桐数据库&#xff08;WutongDB&#xff09;作为一款高性能的分布式关系型数据库&#xff0c;提供了丰富的工具和功能来帮助开发者…

DFT下release的sdc讨论

DFT下release的sdc主要包括三部分&#xff1a; 1、shift_sdc&#xff1a; 主要是检查scan_chain上寄存器q到si的timing情况&#xff1b;同时还要注意edt_logic和scan_chian之间的时序关系&#xff1b;channel_in/out或者wrap_cell_input/output的接口处的timing&#xff1b;处…

logstash grok插件语法介绍

介绍 logstash拥有丰富的filter插件,它们扩展了进入过滤器的原始数据&#xff0c;进行复杂的逻辑处理&#xff0c;甚至可以无中生有的添加新的 logstash 事件到后续的流程中去&#xff01;Grok 是 Logstash 最重要的插件之一。也是迄今为止使蹩脚的、无结构的日志结构化和可查询…

连续 Hopfield 神经网络深入探讨

连续 Hopfield 神经网络深入探讨 一、引言 人工神经网络在众多领域展现出了卓越的性能&#xff0c;连续 Hopfield 神经网络&#xff08;Continuous Hopfield Neural Network&#xff0c;CHNN&#xff09;作为其中的重要分支&#xff0c;为解决复杂的优化问题、模式识别以及联…

TCP/IP--Socket套接字--JAVA

一、概念 Socket套接字&#xff0c;是由系统提供⽤于⽹络通信的技术&#xff0c;是基于TCP/IP协议的⽹络通信的基本操作单元。 基于Socket套接字的⽹络程序开发就是⽹络编程。 二、分类 1.流套接字 使用传输层TCP协议。TCP协议特点&#xff1a;有链接、可靠传输、面向字节流…

革命性AI搜索引擎!ChatGPT最新功能发布,无广告更智能!

零、前言 大人&#xff0c;时代变了。 最强 AI 助力下的无广告搜索引擎终于问世。我们期待已久的这一刻终于到来了&#xff0c;从今天起&#xff0c;ChatGPT 正式转型为一款 AI 搜索引擎&#xff01; 北京时间 11 月 1 日凌晨&#xff0c;恰逢 ChatGPT 两岁生日&#xff0c;O…