2. kafka 生产者

news/2024/11/19 13:01:15/
一. 生产者消息发送流程

在这里插入图片描述

在消息发送的过程中,涉及到了两个线程:main线程和Sender线程。Producer发送的消息会分别经过Interceptors(拦截器),Serializer(序列化器),Partitioner(分区器)最终到达RecordAccumulator,RecordAccumulator是一个双端队列,主要起缓冲区的作用。Sender线程不断从RecordAccumulator中拉取消息发送到 Kafka集群。

二. 异步发送
1. 普通异步发送

普通异步发送指生产者在完成消息发送后不会等待Kafka集群的响应,而是继续去发送下一条消息。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();// 设置bootstrap server地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 设置消息的key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "message: " + i));}producer.close();}
}

maven依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

启动kafka-console-consumer消费者

[root@hadoop1 kafka-3.6.0]# ./bin/kafka-console-consumer.sh  --bootstrap-server 192.168.205.154:9092 --topic first

运行结果:
在这里插入图片描述

2. 带回调的异步发送

带回调的异步发送是指异步发送后,生产者收到Kafka集群返回的Ack时会执行回调函数。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", String.valueOf(i), "message: " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + ", 分区: " + recordMetadata.partition());} else {e.printStackTrace();}}});}producer.close();}
}

运行结果:
在这里插入图片描述

主题:first, 分区: 1
主题:first, 分区: 1
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 0
主题:first, 分区: 2
主题:first, 分区: 2
主题:first, 分区: 2
主题:first, 分区: 2
三. 同步发送

在同步发送模式下,生产者发送完消息后会阻塞等待Kafka集群的响应,生产者收到Kafka集群的Ack才会进行下一步操作,同步发送的方式大大提高了消息的可靠性,但是也会因此损失性能。同步发送只需要执行完send方法后再调用一下 get()方法即可。

代码实现:

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaSenderDemo  {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "sync message: " + i)).get();}producer.close();}
}

运行结果:
在这里插入图片描述

四. 自定义分区器

Kafka中的Topic是可以分区的,使用分区的好处是显而易见的,它可以合理的使用存储资源,提高并行度,一个Topic的多个分区分散在不同的主机上,可以充分利用集群资源。

生产者发送的每一条消息最终只会进入某一个分区,决定消息和分区映射关系的就是Partitioner。Kafka默认分区器是DefaultPartitioner。

以下是DefaultPartitioner的部分源代码:

public class DefaultPartitioner implements Partitioner {
...public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);}
...
}

根据源码可以得出DefaultPartitioner的映射规则如下:

  1. 指明partition的情况下,直接将指明的值作为partition的值。
  2. 没有指明partition但有key的情况下,将key的hash值与topic的分区数取余得到partition的值。
  3. 既没有指明key又没有partition值的情况下,kafka采用Sticky Partition(粘性分区器),会随机选择一个分区,并一致尽可能使用该分区,待该分区的batch已满或者已完成,kafka再随机选择一个分区进行使用(和上一次的分区不同)

如果DefaultPartitioner不能满足实际业务的分区要求,那么可以自定义分区器,要自定义分区器只需要实现Partitioner类即可。下面以一个需求来说明如何实现自定义分区器。
需求:

将包含hello字符串的消息发送到分区0
将包含world字符串的消息发送到分区1

代码实现:

KafkaOwnPartitionerDemo.java

package kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class KafkaOwnPartitionerDemo {private final static String BOOTSTRAP_SERVERS = "192.168.205.154:9092,192.168.205.155:9092,192.168.205.156:9092";public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {String[] msgPrefix = {"hello", "world"};String msg = msgPrefix[i % 2] + i;producer.send(new ProducerRecord<String, String>("first", String.valueOf(i), msg), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("消息:" + msg + ", 主题: " + recordMetadata.topic() + ", 分区: " + recordMetadata.partition());} else {e.printStackTrace();}}});}producer.close();}
}

MyPartitioner.java

package kafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msg = new String(valueBytes);if (msg.contains("hello")) {return 0;} else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

运行结果:
在这里插入图片描述

消息:hello0, 主题: first, 分区: 0
消息:hello2, 主题: first, 分区: 0
消息:hello4, 主题: first, 分区: 0
消息:hello6, 主题: first, 分区: 0
消息:hello8, 主题: first, 分区: 0
消息:world1, 主题: first, 分区: 1
消息:world3, 主题: first, 分区: 1
消息:world5, 主题: first, 分区: 1
消息:world7, 主题: first, 分区: 1
消息:world9, 主题: first, 分区: 1
五. 生产者重要参数列表
参数名称参数描述
bootstrap.servers生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单
key.serializer 和 value.serializer指定发送消息的key和value的序列化类型
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值, 默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。
acks- 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据, Leader 收到数据后应答。-1(all):生产者发送过来的数据, Leader+和 ISR队列里面的所有节点收齐数据后应答

http://www.ppmy.cn/news/1548239.html

相关文章

删除k8s 或者docker运行失败的脚本

vi delete_exited_containers.sh#!/bin/bash# 列出所有停止的容器并存储到数组 list_exited_containers() {echo -e "\nStopped containers:"containers()# 获取停止的容器信息并存入数组while IFS read -r line; docontainers("$line")done < <(do…

31-Shard Allocation Awareness(机架感知)

同一机器上&#xff0c;部署多个es节点&#xff0c;防止副本和主分片分配到同一机器上 例如&#xff1a;es节点a、b、c部署在01机器上&#xff0c;节点d、e、f部署在02机器上 es2.4版本配置 a、b、c节点yaml配置&#xff1a;node.rack: aaa d、e、f节点yaml配置&#xff1a…

FlutterCacheManager组件的用法

文章目录 1. 概念介绍2. 使用方法2.1 调用接口2.2 管理缓冲3. 示例代码4. 内容总结我们在上一章回中介绍了"CachedNetworkImage组件"相关的内容,本章回中将介绍FlutterCacheManager组件.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 我们在本章回中介绍的内容…

【数据结构】`unordered_map` 和 `unordered_set` 的底层原理

unordered_map 和 unordered_set 是 C 标准库中的两个容器&#xff0c;它们被广泛应用于需要快速查找的场景中。它们的查找、插入和删除的平均时间复杂度都是 O(1)&#xff0c;这也是它们的一个重要特性。本文将详细介绍 unordered_map 和 unordered_set 的底层原理&#xff0c…

QT QLabel双击事件

新建类&#xff1a; DoubleClickLabel .h #pragma once#include <QLabel>class DoubleClickLabel : public QLabel {Q_OBJECTpublic:DoubleClickLabel(QWidget *parent);~DoubleClickLabel(); signals:void doubleClicked();protected: //这里重写双击事件virtual v…

学习rust语言宏之macro_rules!

学习rust语言&#xff0c;必然不可避免要了解和熟悉宏。rust语言的宏功能非常强大&#xff0c;通过合理的编写利用宏&#xff0c;可以简化程序代码&#xff0c;也少写很多代码。今天我们先从宏的基本编写方法macro_rules开始&#xff1a; 1&#xff0c;格式 macro_rules! $na…

[C++]:C++11(一)

1. 统一列表初始化 1.1 C11 之前的初始化方式 在 C11 标准中&#xff0c;引入了一个非常实用且强大的特性——统一列表初始化&#xff08;Uniform Initialization&#xff09;&#xff0c;它为我们在初始化各种类型的对象时提供了一种统一且方便的语法形式&#xff0c;极大地…

基于 AI 智能名片 2 + 1 链动模式商城小程序的立体连接营销策略研究

摘要&#xff1a;本文围绕立体连接中线下连通社群这一环节展开&#xff0c;阐述其具体做法及意义&#xff0c;并深入探讨如何将 AI 智能名片 2 1 链动模式商城小程序融入其中&#xff0c;以优化立体连接营销策略&#xff0c;提升营销效果。 一、引言 在当今复杂多变的市场环境…