kafka学习-02

server/2024/11/30 5:44:38/

kafka分区的分配以及再平衡:

4个:

1、Range 以及再平衡

        1)Range 分区策略原理

2)Range 分区分配策略案例

(1)修改主题 first 为 7 个分区。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意:分区数可以增加,但是不能减少。

一个主题,假如副本数想修改,是否可以直接修改?答案是不可以。

如果想修改,如何修改?制定计划,执行计划。

(2)这样可以由三个消费者

CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

备注:只需要将以前的CustomProducerCallback,修改发送次数为500次即可。

2、RoundRobin(轮询) 以及再平衡

1)RoundRobin 分区策略原理

2RoundRobin 分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。

3、Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

4、CooperativeSticky 的解释【新的kafka中刚添加的策略】

在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。

好处是:负载均衡,不好的地方是:多次平衡浪费性能。

动态平衡,在消费过程中,实施再平衡,而不是定下来,等某个消费者退出再平衡。

offset 位移[偏移量](重要)

记录消费到了哪里的这个值,就是偏移量。

记录:哪个主题,哪个分区,哪个位置。

1) 消费 offset 案例

(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

如果不修改是无法查看offset的值的,因为这些都是加密数据。

创建一个新的主题:bigdata

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata --partitions 2 --replication-factor 2

kafka3下执行此命令

查看消费者消费主题__consumer_offsets。 -- from-beginning 表示查看历史所有的偏移量

2) 自动提交案例:

写java代码即可

和之前的基本相同,只是加入了几个参数

        //设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class Demo02_自动提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

3) 手动提交案例

package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class Demo02_手动提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);  // 可以设置为false 之后就需要手动提交KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);// 可以指定条件提交int i = 1;while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());i++;}// 当生产者发送了10条消息后,再提交偏移量(offset)if (i==10){kafkaConsumer.commitAsync();}}}}

4) 指定分区和偏移量消费 (重要)

package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;public class Demo02_指定提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);//执行计划Set<TopicPartition> assignment = kafkaConsumer.assignment();while (assignment.size() == 0){// 拉去数据的代码,此处可以帮助快速构建分区方案kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候由方案了就跳出循环assignment = kafkaConsumer.assignment();}// 获取所有分区的 偏移量(offset)等于 5 以后的数据for (TopicPartition topicPartition : assignment) {kafkaConsumer.seek(topicPartition,5);}// 获取指定分区 5 以后的数据
//        kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
//
//        // 还可以这样写
//        for (TopicPartition topicPartition : assignment) {
//            if (topicPartition.partition() == 0){
//                kafkaConsumer.seek(topicPartition,5);
//            }
//        }while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

5) 指定时间消费

package com.bigdata.day04;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class Demo02_指定时间提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test4");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);//执行计划Set<TopicPartition> assignment = kafkaConsumer.assignment();while (assignment.size() == 0){// 拉去数据的代码,此处可以帮助快速构建分区方案kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候由方案了就跳出循环assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> map = new HashMap<>();for (TopicPartition topicPartition : assignment) {map.put(topicPartition,System.currentTimeMillis()-60*60*1000*10);}Map<TopicPartition, OffsetAndTimestamp> timestampMap = kafkaConsumer.offsetsForTimes(map);Set<Map.Entry<TopicPartition, OffsetAndTimestamp>> entries = timestampMap.entrySet();for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : entries) {kafkaConsumer.seek(entry.getKey(),entry.getValue().offset());}while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

漏消费和重复消费

重复消费:

当我们在使用kafka消费者的时候,已经消费了数据,但是没有提交偏移量,这时就可能造成数据重复

假如我们往消费者中发送了10条数据,并且设置了手动提交,但是并没有提交,这时我们是可以看到消费者数据的,但是此时消费者中的数据没有偏移量等(如果有的话偏移量应该是10),再次消费的时候,消费者就可能接着从偏移量为0的地方开始消费,造成重复消费

漏消费:

先提交 offset 后消费,有可能会造成数据的漏消费。

在flume中使用kafka

生产者将数据发送到kafka中,再使用flume将数据从kafka中抽取到hdfs上

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = bigdata
a1.sources.r1.kafka.consumer.group.id = donghu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


http://www.ppmy.cn/server/146086.html

相关文章

视频流媒体服务解决方案之Liveweb视频汇聚平台

一&#xff0c;Liveweb视频汇聚平台简介: LiveWeb是深圳市好游科技有限公司开发的一套综合视频汇聚管理平台&#xff0c;可提供多协议&#xff08;RTSP/RTMP/GB28181/海康Ehome/大华&#xff0c;海康SDK等&#xff09;的视频设备接入&#xff0c;支持GB/T28181上下级联&#xf…

Linux查看网络基础命令

文章目录 Linux网络基础命令1. ifconfig 和 ip一、ifconfig命令二、ip命令 2. ss命令一、基本用法二、常用选项三、输出信息四、使用示例 3. sar 命令一、使用sar查看网络使用情况 4. ping 命令一、基本用法二、常用选项三、输出结果四、使用示例 Linux网络基础命令 1. ifconf…

传输控制协议(TCP)

传输控制协议是Internet一个重要的传输层协议。TCP提供面向连接、可靠、有序、字节流传输服务。 1、TCP报文段结构 注&#xff1a;TCP默认采用累积确认机制。 2、三次握手、四次挥手 &#xff08;1&#xff09;当客户向服务器发送完最后一个数据段后&#xff0c;发送一个FIN段…

国产FPGA+DSP 双FMC 6U VPX处理板

高性能国产化信号处理平台采用6U VPX架构&#xff0c;双FMC接口国产V7 FPGA 国产多核 DSP 的硬件架构&#xff0c;可以完成一体化电子系统、有源相控阵雷达、电子侦察、MIMO 通信、声呐等领域的高速实时信号处理。 信号处理平台的组成框图如图 1 所示&#xff0c; DSP处理器采…

【docker】8. 镜像仓库实战

综合实战一&#xff1a;搭建一个 nginx 服务 Web 服务器 Web 服务器&#xff0c;一般是指“网站服务器”&#xff0c;是指驻留于互联网上某种类型计算机的程序。Web 服务器可以向 Web 浏览器等客户端提供文档&#xff0c;也可以放置网站文件&#xff0c;让全世界浏览&#xf…

哪些行业对六西格玛管理方法的需求较大?

六西格玛作为一种追求极致质量和流程优化的管理哲学&#xff0c;自诞生以来&#xff0c;便在多个行业中展现出了巨大的应用价值。该方法通过定义、测量、分析、改进和控制&#xff08;DMAIC&#xff09;五个阶段&#xff0c;帮助企业实现流程的持续改进&#xff0c;提高产品质量…

Hive 索引 和 Hive Metastore 的三种配置方式

Hive 索引 和 Hive Metastore 的三种配置方式 Hive 索引&#xff08;Index&#xff09; Hive 索引是一种提高查询性能的技术&#xff0c;通过创建索引来加速对特定列的查询。类似于传统关系数据库的索引&#xff0c;Hive 索引能够在查询中快速定位数据&#xff0c;而不必扫描…

【Spring源码核心篇-04】spring中refresh刷新机制的流程和实现

Spring源码核心篇整体栏目 内容链接地址【一】Spring的bean的生命周期https://zhenghuisheng.blog.csdn.net/article/details/143441012【二】深入理解spring的依赖注入和属性填充https://zhenghuisheng.blog.csdn.net/article/details/143854482【三】精通spring的aop的底层原…