Kafka分区策略实现

ops/2025/2/6 16:26:01/

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用随机分区器的生产者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定义分区器的生产者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }


http://www.ppmy.cn/ops/156189.html

相关文章

2021Java面试-基础篇

文章目录 前言一&#xff1a; Java概述 1、何为编程2、JDK1.5之后的三大版本3、JVM,JRE和JDK的关系4、什么是跨平台&#xff1f;原理是什么5、Java语言有哪些特点6、什么是字节码&#xff1f;采用字节码的最大好处是什么7、什么是Java程序的主类&#xff1f;应用程序和小程序的…

深度学习中,文本分类任务怎么做

一、处理流程 前置步骤&#xff1a; 标注数据得到数据集数据清理&#xff1a;将特殊字符、特殊格式、无效字符去除 正式步骤&#xff1a; 1、分词或分字&#xff1a;英文一般都分词&#xff0c;中文有分词也有分字。分词还是分字取决于你模型的embedding。 2、将字或词编辑ID…

LLM推理--vLLM解读

主要参考&#xff1a; vLLM核心技术PagedAttention原理 总结一下 vLLM 的要点&#xff1a; Transformer decoder 结构推理时需要一个token一个token生成&#xff0c;且每个token需要跟前序所有内容做注意力计算&#xff08;包括输入的prompt和该token之前生成的token&#xf…

浅谈量化感知训练(QAT)

1. 为什么要量化&#xff1f; 假设你训练了一个神经网络模型&#xff08;比如人脸识别&#xff09;&#xff0c;效果很好&#xff0c;但模型太大&#xff08;比如500MB&#xff09;&#xff0c;手机根本跑不动。于是你想压缩模型&#xff0c;让它变小、变快。 最直接的压缩方法…

git基础使用--4---git分支和使用

文章目录 git基础使用--4---git分支和使用1. 按顺序看2. 什么是分支3. 分支的基本操作4. 分支的基本操作4.1 查看分支4.2 创建分支4.3 切换分支4.4 合并冲突 git基础使用–4—git分支和使用 1. 按顺序看 -git基础使用–1–版本控制的基本概念 -git基础使用–2–gti的基本概念…

Linux概述

Linux下开发项目 JavaEE 大数据 Python PHP C/C Go Linux运维工程师 服务器的规划、调试优化&#xff0c;系统的日程监控&#xff0c;故障的处理&#xff0c;对数据的备份和恢复。运维工程师往往管理服务器集群 Linux嵌入式工程师 驱动的开发&#xff0c;嵌入式的系统…

Maven 项目的基本结构

Maven 项目采用了标准的目录结构&#xff0c;旨在统一项目组织方式&#xff0c;提高可维护性&#xff0c;并且让不同的开发人员更容易理解和使用项目。通过遵循约定的目录结构&#xff0c;Maven 可以自动化管理项目的构建过程&#xff0c;并简化构建、测试、部署等任务。 1. M…

【LeetCode 刷题】贪心算法(2)-进阶

此博客为《代码随想录》二叉树章节的学习笔记&#xff0c;主要内容为贪心算法进阶的相关题目解析。 文章目录 135. 分发糖果406. 根据身高重建队列134. 加油站968. 监控二叉树 135. 分发糖果 题目链接 class Solution:def candy(self, ratings: List[int]) -> int:n len…