Kafa分区策略实现

embedded/2025/2/1 14:18:20/

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用随机分区器的生产者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定义分区器的生产者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }


http://www.ppmy.cn/embedded/158646.html

相关文章

【设计模式-行为型】迭代器模式

一、什么是迭代器模式 迭代器模式&#xff0c;顾名思义&#xff0c;同样的为了让大家更加了解啥是迭代器。我们通过电影情结来说明&#xff0c;不知道大家有没有看过一个剧烧脑的科幻大片--《盗梦空间》。影片讲述了由造梦师&#xff08;莱昂纳多迪卡普里奥扮演的&#xff09;带…

基于Python的药物相互作用预测模型AI构建与优化(下.代码部分)

四、特征工程 4.1 分子描述符计算 分子描述符作为量化分子性质的关键数值,能够从多维度反映药物分子的结构和化学特征,在药物相互作用预测中起着举足轻重的作用。RDKit 库凭借其强大的功能,为我们提供了丰富的分子描述符计算方法,涵盖了多个重要方面的分子性质。 分子量…

音视频多媒体编解码器基础-codec

如果要从事编解码多媒体的工作&#xff0c;需要准备哪些更为基础的内容&#xff0c;这里帮你总结完。 因为数据类型不同所以编解码算法不同&#xff0c;分为图像、视频和音频三大类&#xff1b;因为流程不同&#xff0c;可以分为编码和解码两部分&#xff1b;因为编码器实现不…

【2024年华为OD机试】(C卷,100分)- 检查是否存在满足条件的数字组合 (Java JS PythonC/C++)

一、问题描述 题目描述 给定一个正整数数组&#xff0c;检查数组中是否存在满足规则的数字组合。 规则&#xff1a;A B 2C 输入描述 第一行输出数组的元素个数。 接下来一行输出所有数组元素&#xff0c;用空格隔开。 输出描述 如果存在满足要求的数&#xff0c;在同…

1905电影网中国地区电影数据分析(二) - 数据分析与可视化

文章目录 前言一、数据分析1. 数据分析代码实现2. 分析后的数据截图2.1 描述性分析结果数据2.2 类别分布分析结果数据2.3 模式识别分析结果数据2.4 时间序列分析结果数据2.4.1 每年的电影发布数量2.4.2 按年份的评分趋势 2.5 相关性分析结果数据 二、数据可视化1. 描述性分析数…

007 JSON Web Token

文章目录 https://doc.hutool.cn/pages/jwt/#jwt%E4%BB%8B%E7%BB%8D JWT是一种用于双方之间安全传输信息的简洁的、URL安全的令牌标准。这个标准由互联网工程任务组(IETF)发表&#xff0c;定义了一种紧凑且自包含的方式&#xff0c;用于在各方之间作为JSON对象安全地传输信息。…

小程序的协同工作与发布

1.小程序API的三大分类 2.小程序管理的概念&#xff0c;以及成员管理两个方面 3.开发者权限说明以及如何维护项目成员 4.小程序版本

EtherCAT主站IGH-- 25 -- IGH之fsm_slave_scan.h/c文件解析

EtherCAT主站IGH-- 25 -- IGH之fsm_slave_scan.h/c文件解析 0 预览一 该文件功能`fsm_slave_scan.c` 文件功能函数预览二 函数功能介绍`fsm_slave_scan.c` 中主要函数的作用1. `ec_fsm_slave_scan_state_start`2. `ec_fsm_slave_scan_state_address`3. `ec_fsm_slave_scan_stat…