Kafka-代码示例

ops/2024/10/29 23:40:56/

一、构建开发环境

File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency>
</dependencies>

 加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic

kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092

三、编写生产者

kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了

package org.example.kafkaStudy;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;public class KafkaProducerDemo {public static void main(String[] args) {try{//topic名称String topicName = "kafka-study";//broker列表String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";//向topic打多少数据int numRecords = 10000;//是否异步推送数据boolean isAsync = true;int key = 0;int sentRecords = 0;//创建生产者KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);//判断是否达到生产要求while (sentRecords < numRecords) {if (isAsync) {//异步推送asyncSend(producer,topicName, key, "test" + key,sentRecords);} else {//同步推送syncSend(producer,topicName, key, "test" + key,sentRecords);}key++;sentRecords++;}producer.close();} catch (Throwable e) {e.printStackTrace();}}private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)throws ExecutionException, InterruptedException {try {// 发送记录,然后调用get,这会阻止等待来自broker的ackRecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();Utils.maybePrintRecord(sentRecords, key, value, metadata);return metadata;} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException| OutOfOrderSequenceException | SerializationException e) {Utils.printErr(e.getMessage());} catch (KafkaException e) {Utils.printErr(e.getMessage());}return null;}private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {//异步发送记录,设置一个回调以通知结果。//请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));}private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {Properties props = new Properties();// 生产者连接到broker需要引导服务器配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 设置序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);if (transactionTimeoutMs > 0) {// 事务协调器主动中止正在进行的事务之前的最长时间props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);}if (transactionalId != null) {// 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);}// 在分区级别启用重复保护props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);return new KafkaProducer<>(props);}static class ProducerCallback implements Callback {private final int key;private final int sentRecords;private final String value;public ProducerCallback(int key, String value,int sentRecords) {this.key = key;this.sentRecords = sentRecords;this.value = value;}/*** 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,* 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。** @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。* @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。*/public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {Utils.printErr(exception.getMessage());if (!(exception instanceof RetriableException)) {// 我们无法从这些异常中恢复过来}} else {Utils.maybePrintRecord(sentRecords, key, value, metadata);}}}
}

四、编写消费者

package org.example.kafkaStudy;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;public class KafkaConsumerDemo {public static void main(String[] args) {//topic名称String topicName = "kafka-study";//组名称String groupName = "my-group-1";//broker列表String bootstrapServers =  "cdh1:9092,cdh2:9092,cdh3:9092";//向topci打多少数据int numRecords = 10000;int remainingRecords = 10000;// 消费来自 topic = kafka-study 的数据KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);//订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知consumer.subscribe(singleton(topicName));Utils.printOut("Subscribed to %s", topicName);while (remainingRecords > 0) {try {// 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。// 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<Integer, String> record : records) {Utils.maybePrintRecord(numRecords, record);}remainingRecords -= records.count();} catch (AuthorizationException | UnsupportedVersionExceptione) {// 我们无法从这些异常中恢复过来Utils.printErr(e.getMessage());} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {// 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效Utils.printOut("Invalid or no offset found, using latest");consumer.seekToEnd(e.partitions());consumer.commitSync();} catch (KafkaException e) {// 记录异常并尝试继续Utils.printErr(e.getMessage());}}consumer.close();Utils.printOut("Fetched %d records", numRecords - remainingRecords);}private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,String groupId , boolean readCommitted) {Properties props = new Properties();// 消费者连接到broker需要引导服务器配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/portprops.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());// 当我们使用订阅(topic)进行组管理时,需要消费者groupIdprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//设置静态成员资格以提高可用性(例如滚动重启)
//        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));//启用EOS时禁用自动提交,因为偏移量与事务一起提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");//读取数据用到的反序列化器,需要和生产者对应props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);if (readCommitted) {// 跳过正在进行和已中止的事务props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");}// 在偏移无效或没有偏移的情况下设置重置偏移策略props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return new KafkaConsumer<>(props);}
}

五、运行程序

生产者日志打印

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)

......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2)

.......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)

我们再次用命令看下每个分区的offset

消费者日志打印

main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records

六、问题说明

从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?

在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。

消费者也是从brokers一批一批的拉取数据来消费的

我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。


http://www.ppmy.cn/ops/129441.html

相关文章

Es可视化界面 ElasticHd

前言 在开发的过程中&#xff0c;有一个可视化界面工具&#xff0c;以及一个可执行的es相关语句的工具十分重要&#xff0c;主要有以下这些 1. Kibana‌&#xff1a;○ Kibana是Elastic官方提供的数据可视化工具&#xff0c;功能强大&#xff0c;支持多种图表类型&#xff0c…

初始JavaEE篇——多线程(4):wait、notify,饿汉模式,懒汉模式,指令重排序

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;JavaEE 目录 wait、notify 方法 多线程练习 单例模式 饿汉模式 懒汉模式 指令重排序 wait、notify 方法 wait 和 我们前面学习的sleep…

数据采集与数据分析:数据时代的双轮驱动

“在当今这个数据驱动的时代&#xff0c;信息已成为企业决策、市场洞察、科学研究等领域不可或缺的核心资源。而爬虫数据采集与数据分析&#xff0c;作为数据处理链条上的两大关键环节&#xff0c;它们之间相辅相成&#xff0c;共同构成了数据价值挖掘的强大引擎。” 爬虫数据采…

MySQL Workbench工作台汉化

一、下载汉化包 通过百度网盘分享的文件&#xff1a;MySQL汉化包.rar 链接&#xff1a;https://pan.baidu.com/s/1PaJSU9dvVnQQWEESHSue5Q 二、汉化过程 注意&#xff1a;替换之前一定要记得把两个文件复制出来存着&#xff0c;防止替换失败修改了文件 找到MySQL的工作台da…

武器检测与分割系统:全程教学跟进

武器检测与分割系统源码&#xff06;数据集分享 [yolov8-seg-SPPF-LSKA&#xff06;yolov8-seg-FocalModulation等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI Glob…

《Python游戏编程入门》注-第4章1

《Python游戏编程入门》的第4章是“用户输入&#xff1a;Bomb Cathcer游戏”&#xff0c;通过轮询键盘和鼠标设备状态实现Bomb Cathcer游戏。 1 Bomb Cathcer游戏介绍 “4.1 认识Bomb Cathcer游戏”内容介绍了Bomb Cathcer游戏的玩法&#xff0c;即通过鼠标来控制红色“挡板”…

Linux 重启命令全解析:深入理解与应用指南

Linux 重启命令全解析&#xff1a;深入理解与应用指南 在 Linux 系统中&#xff0c;掌握正确的重启命令是确保系统稳定运行和进行必要维护的关键技能。本文将深入解析 Linux 中常见的重启命令&#xff0c;包括功能、用法、适用场景及注意事项。 一、reboot 命令 功能简介 re…

leetcode 763.划分字母区间

思路&#xff1a;贪心 其实这个题目并不难&#xff0c;只需要分析出来每一个字母最后出现的坐标就行。 我们根据字母最后出现的坐标数来判断最后划分的字符串。 比如说&#xff0c;字符串前面有abc&#xff0c;这三个字母最后出现的地方就是这个位置&#xff0c;那么我们直接…