Kafka相关API开发

news/2024/12/29 21:04:44/

(一)引入依赖

        用API直接去操作kafka(读写数据)在实际开发中用的并不多,学习它主要还是为了加深对Kafka功能的理解。kafka的读写操作,实际开发中,是通过各类更上层的组件去实现。而这些组件在读写kafka数据时,用的当然是kafka的java api,比如flink、spark streaming和flume等。

<properties> <kafka.version>2.4.1</kafka.version>
</properties>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>       
</dependency>

(二)API 开发——producer 生产者

1.构造一个生产者,可以持续发送大量数据

2.构造一个生产者,有必须设置的参数:

bootstrap.server

key.seralizer

value.seralizer

其他的,可选

3.使用特定接口

kafka的生产者发送用户的业务数据时,必须使用org.apache.kafka.common.serialization.Serializer接口的实现类这一序列化框架来序列化用户的数据。

4.发往指定topic

构造一个Kafka生产者后,并没有固定数据要发往的topic,因此,可以将不同的数据发往不同的topic

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** kafka生产者API代码示例*/
public class ProducerDemo {public static void main(String[] args) throws InterruptedException {// 泛型K:要发送的数据中的key// 泛型V:要发送的数据中的value// 隐含之意:kafka中的message,是Key-Value结果的(可以没有Key)Properties props = new Properties();props.setProperty("bootstrap.servers", "node141:9092,node142:9092");// 因为,kafka底层存储没有类型维护机制,用户所发的所有数据类型,都必须变成 序列化后的byte[]// 所以,kafka的producer需要一个针对用户要发送的数据类型的序列化工具类// 且这个序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializerprops.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*** 代码中进行客户端参数配置的另一种写法*/props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.ACKS_CONFIG, "all");// 消息发送应答级别// 构造一个生产者客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {// 将业务数据封装成客户端所能发送的封装格式// 0->abc0// 1->abc1// TODO:奇数发往abcx,偶数发往abcyProducerRecord<String, String> message = null;if (i % 2 == 0) {message = new ProducerRecord<>("abcy", "user_id" + i, "doit_edu" + i);} else {message = new ProducerRecord<>("abcx", "user_id" + i, "doit_edu" + i);}// 消费时只会打印value的值,key并没有读到// 调用客户端去发送// 数据的发送动作在producer的底层是异步线程去异步发送的,即调用send方法立即执行完毕,直接走之后的代码,不代表数据发送成功producer.send(message);Thread.sleep(100);}// 关闭客户端
//        producer.flush();producer.close();}
}

5.消费消息

(三)API开发——consumer消费者

kafka消费者的起始消费位置有两种决定机制:

1.手动指定了起始位置,它肯定从指定的位置开始

2.如果没有手动指定起始位置,它去找消费者组之前所记录的偏移量开始

3.如果之前的位置也获取不到,就看参数:auto.offset.reset所指定的重置策略

4.如果指定的offset>原有分区内的最大offset,就自动重置到最大的offset

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** kafka消费者API代码示例*/
public class ConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// kafka的消费者,默认是从所属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略来确定消费起始偏移量:// 1.earliest:自动重置到每个分区的最前一条消息// 2.latest:自动重置到每个分区的最新一条消息// 3.none:如果没有为使用者的组找到以前的偏移,则向使用者抛出异常// 如果输入除了上述三种之外的,会向使用者抛出异常props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 如果latest消息找不到,consumer.seek就起作用了// 设置消费者所属的组idprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "d30-1");// 设置消费者自动提交最新的的消费位移——默认是开启的props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 设置自动提交位移的时间间隔——默认是5000msprops.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");// 构造一个消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题(可以是多个)
//        consumer.subscribe(Collections.singletonList("abcx"));consumer.subscribe(Arrays.asList("abcx","abcy"));// 正则订阅主题
//        consumer.subscribe(Pattern.compile ("abc.*" ));// 显式指定消费起始偏移量/*TopicPartition abcxP0 = new TopicPartition("abcx", 0);TopicPartition abcxP1 = new TopicPartition("abcx", 1);consumer.seek(abcxP0,10);consumer.seek(abcxP1,15);*/// 循环往复拉取数据boolean condition = true;while (condition) {// 客户端去拉取数据的时候,如果服务端没有数据响应,会保持连接等待服务端响应// poll中传入的超时时长参数,是指等待的最大时长ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// Iterator:迭代器// Iterable:可迭代的,是迭代器的再封装// 实现了Iterable的对象,可以用增强for循环去遍历迭代,也可以从对象上取到iterator,来用iterator.hasNext来迭代// Iterator<ConsumerRecord<String, String>> iterator = records.iterator();// 直接用for循环来迭代本次取到的一批数据for (ConsumerRecord<String, String> record : records) {// ConsumerRecord中,不光有用户的业务数据,还有Kafka注入的元数据String key = record.key();String value = record.value();// 本条消息所属的topic:拉取的时候可能不止一个topic,所以会有这个方法String topic = record.topic();// 本条数据所属的分区int partition = record.partition();// 本条数据的偏移量long offset = record.offset();//key的长度int keySize = record.serializedKeySize();//value的长度int valueSize = record.serializedValueSize();// 当前这条数据所在分区的leader的朝代纪年Optional<Integer> leaderEpoch = record.leaderEpoch();// kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据// timestamp就是其中之一:记录本条数据的时间戳// 时间戳有两种类型:一个是CreateTime(这条数据创建的时间——生产者), LogAppendTime(broker往log里面追加的时间)TimestampType timestampType = record.timestampType();long timestamp = record.timestamp();// 数据头:是生产者在写入数据时附加进去的,相当于用户自定义的元数据// 在生产者写入消息时,可以自定义元数据,所以record.headers()方法就能够消费到// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)// 如果生产者写入消息时,没有定义元数据,record.headers()方法就不会消费到Headers headers = record.headers();//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",key, value, topic, partition, offset,leaderEpoch.get(), timestampType.name, timestamp,keySize, valueSize));}}// 对数据进行业务逻辑处理// 关闭客户端// consumer.close();}
}

有了上面两个API,先开启消费者,然后开启生产者,消费者控制就会输出消息。

 // 当前这条数据所在分区的leader的朝代纪年
Optional<Integer> leaderEpoch = record.leaderEpoch();

当leader有变化,leaderEpoch.get()的值就会+1,初始值为0

(四)API开发——指定偏移量订阅消息

1.subscribe与assign订阅

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;/*** 指定偏移量*/
public class ConsumerDemo2 {public static void main(String[] args) throws IOException {Properties props = new Properties();// 从配置文件中加载props.load(ConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "doit30-5");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*  // subscribe订阅,会参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区的consumer.subscribe(Collections.singletonList("ddd"));// 这里无意义地去拉一次数据,主要就是为了确保:分区分配动作已完成consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 然后再定义到指定的偏移量,开始正式消费consumer.seek(new TopicPartition("ddd",0),2);*/// 既然要自己指定一个确定的起始消费位置,那通常隐含之意是不需要去参与消费者组自动再均衡机制,该方法比较常用// 那么,就不要使用subscribe来订阅主题consumer.assign(Arrays.asList(new TopicPartition("ddd", 0)));consumer.seek(new TopicPartition("ddd", 0), 4);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));for (ConsumerRecord<String, String> record : records) {int keySize = record.serializedKeySize();int valueSize = record.serializedValueSize();System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",record.key(), record.value(), record.topic(), record.partition(), record.offset(),record.leaderEpoch().get(), record.timestampType().name, record.timestamp(),keySize, valueSize));}}}
}

2.subscribe与assign订阅具体区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能:

        在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign()方法订阅分区时,是不具备消费者自动均衡的功能的:

        其实这一点从assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.取消订阅

        如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

组协调器就是x组写消费位移的leader副本所在的broker。


http://www.ppmy.cn/news/1543278.html

相关文章

如何配置,npm install 是从本地安装依赖

在 Node.js 中&#xff0c;要使npm install从本地安装依赖&#xff0c;可以按照以下步骤进行配置&#xff1a; 一、准备本地依赖包 确保你有本地的依赖包。这个依赖包可以是一个包含package.json文件的文件夹&#xff0c;或者是一个已经打包好的.tgz文件。 二、使用相对路径…

四、k8s快速入门之Kubernetes资源清单

kubernetes中的资源 ⭐️ k8s中所有的内容都抽象为资源&#xff0c;资源实列化之后&#xff0c;叫做对象 1️⃣名称空间级别 ⭐️ kubeadm在执行k8s的pod的时候会在kube-system这个名称空间下执行&#xff0c;所以说当你kubectl get pod 的时候是查看不到的查看的是默认的po…

频率限制:WAF保护网站免受恶意攻击的关键功能

频率限制&#xff08;Rate Limiting&#xff09;是一项有效的安全措施&#xff0c;用于控制每个 IP 地址的访问速率&#xff0c;以防止恶意用户利用大量请求对网站进行攻击&#xff0c;例如防止 CC 攻击等。频率限制不仅能保护网站资源&#xff0c;还能提升服务的稳定性。 下面…

CentOS9 Stream 支持输入中文

CentOS9 Stream 支持输入中文 方法一&#xff1a;确保 gnome-control-center 和相关组件已更新方法二&#xff1a;手动添加输入法源配置方法三&#xff1a;配置 .xinputrc 文件方法四&#xff1a;检查语言包 进入centos9 stream后&#xff0c;点击右上角电源键&#xff0c;点击…

pgsql数据量大之后可能遇到的问题

当 PostgreSQL 数据量增大时&#xff0c;可能会遇到以下问题&#xff1a; 查询性能下降&#xff1a;随着数据量的增加&#xff0c;查询可能会变得缓慢&#xff0c;尤其是在没有适当索引的情况下。大量的数据意味着更多的行需要被扫描和过滤&#xff0c;这会显著增加查询执行时间…

‌MySQL中‌between and的基本用法‌

文章目录 一、between and语法二、使用示例2.1、between and数值查询2.2、between and时间范围查询2.3、not between and示例 BETWEEN AND操作符可以用于数值、日期等类型的字段&#xff0c;包括边界值。 一、between and语法 MySQL中的BETWEEN AND操作符用于在两个值之间选择…

LangGPT结构化提示词编写实践(L1G3000 浦语提示词工程实践)

任务0&#xff1a;使用浦语 InternLM 大模型一键写书 配dotenv环境时&#xff0c;用的指令是 pip install python-dotenv pip install phidata 主题是AI研究者获诺奖的可行性分析&#xff0c;最后生成的书也挺靠谱的 甚至成功预测了今年的诺奖 基础任务&#xff1a;统计单词…

快速上手 Rust——实用示例

Rust 跨平台应用开发第一章&#xff1a;快速上手 Rust——实用示例 1.3 实用示例 在这一节中&#xff0c;我们将通过一系列实用的示例来帮助您更好地理解 Rust 的特性&#xff0c;并展示如何在实际项目中使用这些特性。示例将涵盖文件操作、网络请求、并发编程、命令行工具以…