Kafka3.0.0版本——生产者如何提高吞吐量

news/2024/10/22 17:31:58/

目录

    • 一、生产者提高吞吐量参数设置
    • 二、产者提高吞吐量代码示例

一、生产者提高吞吐量参数设置

  • batch.size:设置批次大小,默认16k
  • linger.ms:设置等待时间,修改为5-100ms
  • buffer.memory:设置缓冲区大小, 默认 32M
  • compression.type:设置压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

二、产者提高吞吐量代码示例

  • 代码示例

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    /*** 生产者提高吞吐量* 1、设置批次大小,batch.size 默认 16K* 2、设置等待时间,linger.ms 默认 0* 3、设置缓冲区大小,buffer.memory 默认 32M* 4、设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd* */
    public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());/*** 4、提高吞吐量* *///设置缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//设置批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//设置等待时间 linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//设置压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//5、创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//6、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}//7、关闭资源kafkaProducer.close();}
    }
  • 在三台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]#  bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.29:9092 --topic news
    
  • 在 IDEA 中执行代码,观察 三台服务器控制台中是否接收到消息,如下图:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述


http://www.ppmy.cn/news/1002038.html

相关文章

【雕爷学编程】Arduino动手做(181)---Maixduino AI开发板

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

Python 程序设计入门(004)—— 赋值运算符与常用函数

Python 程序设计入门&#xff08;004&#xff09;—— 赋值运算符与常用函数 目录 Python 程序设计入门&#xff08;004&#xff09;—— 赋值运算符与常用函数一、赋值运算符二、常用的数学函数1、round() 函数2、pow() 函数3、divmod() 函数 三、字符串与 str() 函数1、字符串…

基于Windows手动编译openssl和直接安装openssl

零、环境 win10-64位 VS2019 一、手动编译 1、安装perl https://platform.activestate.com/ActiveState-Projects/ActiveState-Perl-5.36.0 两种方法都没能成功。。第一种下载后会得到一个 state-remote-installer.exe&#xff0c;然后安装时会在命令行中执行&#xff0c;…

Host/ KVM/ Docker/ K8s/ OpenStack/ Mesos简单介绍和区别

Host/ KVM/ Docker/ Kubernetes/ OpenStack 和 Mesos 的简单介绍&#xff1a; - Host&#xff1a; Host 是指物理服务器或虚拟机主机&#xff0c;它们可以运行多个虚拟机或容器来提供计算和存储资源。Host 是云计算和容器化技术中的基本组成部分。 - KVM&#xff1a; KVM 是…

自然语言处理从入门到应用——LangChain:提示(Prompts)-[示例选择器(Example Selectors)]

分类目录&#xff1a;《自然语言处理从入门到应用》总目录 如果我们拥有大量的示例&#xff0c;我们可能需要选择在提示中包含哪些示例。ExampleSelector是负责执行此操作的类。 其基本接口定义如下所示&#xff1a; class BaseExampleSelector(ABC):"""Interf…

Elasticsearch 快照和恢复

文章目录 简介快照存储库说明创建或更新存储库接口说明路径参数查询参数请求正文 使用 fs 方式创建存储库验证储存库获取存储库信息删除存储库清理储存库 快照创建快照路径参数查询参数请求正文示例 获取快照查询参数示例 克隆快照查询参数示例 获取快照状态示例 恢复快照查询参…

Java on Azure Tooling 6月更新|标准消费和专用计划及本地存储账户(Azurite)支持

作者&#xff1a;Jialuo Gan - Program Manager, Developer Division at Microsoft 排版&#xff1a;Alan Wang 大家好&#xff0c;欢迎阅读 Java on Azure 工具的六月更新。在本次更新中&#xff0c;我们将介绍 Azure Spring Apps 标准消费和专用计划支持以及本地存储账户&…

Python爬虫教程篇+图形化整理数据(数学建模可用)

一、首先我们先看要求 1.写一个爬虫程序 2、爬取目标网站数据&#xff0c;关键项不能少于5项。 3、存储数据到数据库&#xff0c;可以进行增删改查操作。 4、扩展&#xff1a;将库中数据进行可视化展示。 二、操作步骤&#xff1a; 首先我们根据要求找到一个适合自己的网…