自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例

devtools/2024/11/26 3:03:58/

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、脚本功能概述

二、生产者性能测试

三、消费者性能测试

四、查看可用主题列表

五、脚本使用示例

六、脚本源码

七、总结


        在大数据处理的领域中,Kafka 扮演着极为重要的角色,它作为一个分布式流处理平台,能够高效地处理大规模的实时数据。而 Kafka 提供了一系列的脚本工具,帮助我们更好地管理和测试 Kafka 集群。今天,我们就来深入探讨一个自定义的 Kafka 脚本 kf-use.sh,了解其功能与使用场景。

一、脚本功能概述

        kf-use.sh 脚本主要提供了两个核心功能:生产者性能测试和消费者性能测试,同时还具备查看可用主题列表以及退出脚本等功能。通过这些功能,我们可以对 Kafka 集群的生产和消费能力进行评估,以便优化集群配置和数据处理流程。

二、生产者性能测试

  1. 参数输入
    • 当选择生产者性能测试功能时,脚本首先会提示用户输入主题名称。如果输入的主题不存在,脚本会要求用户重新输入,确保测试的主题是有效的。
    • 接着,用户需要输入要生产的记录数量,并且脚本会验证输入是否为整数,如果不是则提示重新输入。
    • 最后,用户要输入每条记录的大小(单位为字节),同样会进行整数验证。
  2. 性能测试执行
    • 一旦用户输入了正确的参数,脚本会调用 kafka-producer-perf-test.sh 工具,并传入相应的参数,如主题名称、记录数量、记录大小等,同时设置吞吐量为 -1(表示不限制吞吐量),并指定 Kafka 集群的地址 bigdata01:9092。这样就可以开始对生产者的性能进行测试,测试结果将反映出在给定条件下生产者向指定主题发送数据的效率。

三、消费者性能测试

  1. 主题选择
    • 在消费者性能测试功能中,首先会调用 kafka-topics.sh 工具列出当前可用的主题列表,然后提示用户输入要测试的主题名称。如果输入的主题不存在,脚本会要求重新输入。
  2. 性能测试执行
    • 当用户选择了存在的主题后,脚本会调用 kafka-consumer-perf-test.sh 工具,传入 Kafka 集群地址 bigdata01:9092、主题名称以及要消费的消息数量(这里固定为 100000 条),从而对消费者从指定主题消费数据的性能进行测试,测试结果可以帮助我们了解消费者处理数据的速度和效率。

四、查看可用主题列表

        无论是在生产者还是消费者性能测试功能中,都提供了查看可用主题列表的选项。通过调用 kafka-topics.sh 工具并传入集群地址 bigdata01:9092,可以获取当前 Kafka 集群中所有的主题名称,方便用户了解集群中的数据主题情况,以便做出正确的测试选择。

五、脚本使用示例

假设我们要对一个名为 test_topic 的主题进行生产者性能测试,我们可以按照以下步骤操作:

  1. 运行 kf-use.sh 脚本。
  2. 选择生产者性能测试功能(可能是对应的数字选项,如 1)。
  3. 输入主题名称 test_topic
  4. 输入要生产的记录数量,例如 10000。
  5. 输入每条记录的大小,比如 100 字节。
  6. 脚本会自动执行生产者性能测试,并输出测试结果,包括发送的总字节数、每秒发送的记录数、每秒发送的字节数等信息。

同样,如果要进行消费者性能测试,例如对 test_topic 主题:

  1. 运行 kf-use.sh 脚本并选择消费者性能测试功能(可能是数字 7 后再选择 1)。
  2. 查看可用主题列表,确认 test_topic 存在后输入该主题名称。
  3. 脚本会执行消费者性能测试,并给出消费 100000 条消息的相关性能数据,如每秒消费的消息数等。

六、脚本源码

#!/bin/bashwhile true; do# 命令大全系统界面echo "Kafka 命令大全系统:"echo "1. 主题操作(topics)"echo "2. 生产者操作(producer)"echo "3. 消费者操作(consumer)"echo "4. 配置操作(configs)"echo "5. 消费者组操作(consumer groups)"echo "6. 生产者性能测试(producer perf test)"echo "7. 消费者性能测试(consumer perf test)"echo "0. 退出"read -p "请输入功能选项数字:" choicecase $choice in1)# 主题操作菜单while true; doecho "主题操作功能:"echo "1. 查看所有主题"echo "2. 创建主题"echo "3. 查看某主题详细信息"echo "4. 修改某主题分区数"echo "5. 删除主题"echo "0.返回命令大全系统界面"read -p "请输入主题操作选项数字:" topic_choicecase $topic_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;2)# 创建主题read -p "请输入要创建的主题名称:" topic_namewhile true; doread -p "请输入分区数(整数):" partitionsif [[ $partitions =~ ^[0-9]+$ ]]; thenbreakelseecho "分区数必须是整数,请重新输入。"fidonewhile true; doread -p "请输入副本数(整数,且不超过可用 broker 数量):" replication_factorif [[ $replication_factor =~ ^[0-9]+$ ]]; then# 这里可添加检查副本数不超过可用 broker 数量的逻辑,暂时省略breakelseecho "副本数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions $partitions --replication-factor $replication_factor --topic $topic_name;;3)# 查看某主题详细信息kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看详细信息的主题名称:" topic_to_describekafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic $topic_to_describe;;4)# 修改某主题分区数kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改分区数的主题名称:" topic_to_alterwhile true; doread -p "请输入新的分区数(整数且大于当前分区数):" new_partitionsif [[ $new_partitions =~ ^[0-9]+$ ]]; then# 这里可添加检查新分区数是否大于当前分区数的逻辑,暂时省略breakelseecho "新分区数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic $topic_to_alter --partitions $new_partitions;;5)# 删除主题kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要删除的主题名称:" topic_to_deletekafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic $topic_to_delete;;0)break;;*)echo "无效的主题操作选择。";;esacdone;;2)# 生产者操作菜单while true; doecho "生产者操作功能:"echo "1. 发送消息到指定主题"echo "0.返回命令大全系统界面"read -p "请输入生产者操作选项数字:" producer_choicecase $producer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要发送消息的主题名称:" topic_nameecho "开始发送消息到主题 $topic_name。输入'EXIT'退出发送。"while true; doread -p "请输入消息内容:" messageif [ "$message" = "EXIT" ]; thenbreakfikafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic $topic_name <<< "$message"done;;0)break;;*)echo "无效的生产者操作选择。";;esacdone;;3)# 消费者操作菜单while true; doecho "消费者操作功能:"echo "1. 消费指定主题的消息"echo "2. 从主题开头消费所有消息"echo "0.返回命令大全系统界面"read -p "请输入消费者操作选项数字:" consumer_choicecase $consumer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要从开头消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic $topic_name;;0)break;;*)echo "无效的消费者操作选择。";;esacdone;;4)# 配置操作菜单while true; doecho "配置操作功能:"echo "1. 查看主题配置"echo "2. 修改主题配置"echo "0.返回命令大全系统界面"read -p "请输入配置操作选项数字:" config_choicecase $config_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看配置的主题名称:" topic_namekafka-configs.sh --bootstrap-server bigdata01:9092 --describe --entity-type topics --entity-name $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改配置的主题名称:" topic_nameread -p "请输入配置项名称:" config_nameread -p "请输入配置项值:" config_valuekafka-configs.sh --bootstrap-server bigdata01:9092 --alter --entity-type topics --entity-name $topic_name --add-config $config_name=$config_value;;0)break;;*)echo "无效的配置操作选择。";;esacdone;;5)# 消费者组操作菜单while true; doecho "消费者组操作功能:"echo "1. 查看消费者组列表"echo "2. 查看消费者组详情"echo "3. 重置消费者组偏移量"echo "0.返回命令大全系统界面"read -p "请输入消费者组操作选项数字:" consumer_group_choicecase $consumer_group_choice in1)kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --list;;2)read -p "请输入要查看详情的消费者组名称:" group_namekafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --describe --group $group_name;;3)read -p "请输入要重置偏移量的消费者组名称:" group_nameread -p "请输入要重置偏移量的主题名称:" topic_namekafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --reset-offsets --group $group_name --topic $topic_name --to-earliest;;0)break;;*)echo "无效的消费者组操作选择。";;esacdone;;6)# 生产者性能测试菜单while true; doecho "生产者性能测试功能:"echo "1. 执行生产者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入生产者性能测试选项数字:" producer_perf_choicecase $producer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenwhile true; doread -p "请输入要发送的记录数量(整数):" num_recordsif [[ $num_records =~ ^[0-9]+$ ]]; thenbreakelseecho "记录数量必须是整数,请重新输入。"fidonewhile true; doread -p "请输入每条记录的大小(整数,单位字节):" record_sizeif [[ $record_size =~ ^[0-9]+$ ]]; thenbreakelseecho "记录大小必须是整数,请重新输入。"fidonekafka-producer-perf-test.sh --topic $topic_name --num-records $num_records --record-size $record_size --throughput -1 --producer-props bootstrap.servers=bigdata01:9092elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的生产者性能测试选择。";;esacdone;;7)# 消费者性能测试菜单while true; doecho "消费者性能测试功能:"echo "1. 执行消费者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入消费者性能测试选项数字:" consumer_perf_choicecase $consumer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenkafka-consumer-perf-test.sh --broker-list bigdata01:9092 --topic $topic_name --messages 100000elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的消费者性能测试选择。";;esacdone;;0)echo "退出脚本。"break;;*)echo "无效的选择。";;esac
done

七、总结

        kf-use.sh 脚本为我们提供了一个便捷的方式来测试 Kafka 集群的生产者和消费者性能,同时方便地查看可用主题列表。通过合理使用这个脚本,我们可以更好地了解 Kafka 集群在数据生产和消费方面的能力,及时发现潜在的性能瓶颈并进行优化,从而提高整个大数据处理流程的效率。无论是对于 Kafka 初学者还是有一定经验的开发者,这个脚本都是一个非常实用的工具,可以帮助我们更好地管理和优化 Kafka 集群的运行。

        在实际应用中,我们可以根据不同的业务需求和数据处理场景,灵活调整测试参数,深入分析测试结果,以确保 Kafka 集群能够稳定、高效地运行,满足日益增长的大数据处理需求。


http://www.ppmy.cn/devtools/137010.html

相关文章

OpenAI震撼发布:桌面版ChatGPT,Windows macOS双平台AI编程体验!

【雪球导读】 「OpenAI推出ChatGPT桌面端」 OpenAI重磅推出ChatGPT桌面端&#xff0c;全面支持Windows和macOS系统&#xff01;这款新工具为用户在日常生活和工作中提供了前所未有的无缝交互体验。对于那些依赖桌面端进行开发工作的专业人士来说&#xff0c;这一更新带来了令人…

云原生基础-云计算概览

目录 云计算的基本概念 云计算的服务模型 云计算的部署模式 云计算的基本概念 云计算是一种通过互联网提供计算资源和服务的模式。允许用户按需访问和使用各种计算资源&#xff0c;如服务器、存储、数据库、网络等&#xff0c;而无需了解底层基础设施的具体细节。云计算的核心理…

美创科技入选2024数字政府解决方案提供商TOP100!

11月19日&#xff0c;国内专业咨询机构DBC德本咨询发布“2024数字政府解决方案提供商TOP100”榜单。美创科技凭借在政府数据安全领域多年的项目经验、技术优势与创新能力&#xff0c;入选收录。 作为专业数据安全产品与服务提供商&#xff0c;美创科技一直致力于为政府、金融、…

【人工智能】用Python和NLP工具构建文本摘要模型:使用NLTK和spaCy进行自然语言处理

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 文本摘要是自然语言处理(NLP)中的关键任务之一,广泛应用于新闻、博客、社交媒体和搜索引擎等场景。通过生成简洁而准确的文本摘要,我们可以大大提升信息处理效率。本文将探讨如何使用Python结合NLP工具…

H.265流媒体播放器EasyPlayer.js H5流媒体播放器关于如何查看手机端的日志信息并保存下来

现今流媒体播放器的发展趋势将更加多元化和个性化。人工智能的应用将深入内容创作、用户体验优化等多个方面&#xff0c;带来前所未有的个性化体验。 EasyPlayer.js H.265流媒体播放器属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#…

Unity开发抖音小游戏使用长音频和短音频

抖音小游戏使用长音频和短音频 介绍WebGL对Unity音频的限制优化建议Iphone静音不同策略Unity中播放长音频无法播放可以使用以下方法总结 介绍 最近好久没有更新文章了&#xff0c;最近在研究抖音小程序也在帮公司做抖音小游戏这块&#xff0c;正好之前遇到了一个比较困扰的问题…

优化 Solana 程序

可操作的见解 对于大型数据结构和高频操作&#xff0c;使用零拷贝反序列化使用 nostd_entrypoint 代替 solana_program 的臃肿入口点最小化动态分配&#xff0c;优先使用基于栈的数据结构实现自定义序列化/反序列化以避免 Borsh 的开销用 #[inline(always)] 标记关键函数以获得…

Java基础-组件及事件处理(中)

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 BorderLayout布局管理器 说明&#xff1a; 示例&#xff1a; FlowLayout布局管理器 说明&#xff1a; …