从kafka和zookeeper中获取生产和消费偏移量

embedded/2024/10/20 16:06:39/

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST][-m INTERVAL_MINUTES]Usage of argparseoptional arguments:-h, --help            show this help message and exit-k KAFKA_HOST, --kafka_host KAFKA_HOST需要输入kafka:端口-z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST需要输入zookeeper:端口-m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartitiondef get_zoo_consumer_info(Topology):Topology_num = 0zk_cli.start()path = "/stormOffset/" + Topology + "/partition_0"if zk_cli.exists(path):str_data, stat = zk_cli.get(path)str_data = json.loads(str_data)Topology_num =  str_data.get("offset")#print("zookeeper now " + path + " offsets: " + str(Topology_num) )else:   print("Path " + path  + " does not exist.")return Topology_numdef get_kafka_consumer_info(server, topic):partition = 0tp = TopicPartition(topic, partition)end_offset = server.end_offsets([tp])[tp]#print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))return end_offsetif  __name__ == '__main__':parser = argparse.ArgumentParser(description='Usage of argparse')parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')args = parser.parse_args()kafka_host= args.kafka_hostzookeer_host= args.zookeeper_hostKafka_production_topics = "agent,record"Zoo_consumption_topics= "agentTopology,recordTopology"Interval_minutes = args.Interval_minutestry:zk_cli = KazooClient(hosts=zookeer_host)#print("init zookeeper " + zookeer_host + " conn ok")except Exception as e:print("init zookeeper conn error: "+ str(e))try:#kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)#print("init kafka " + kafka_host + "  conn ok")except Exception as e:print("init kafka conn error: "+ str(e))zoo_offset = {}kafka_offset = {}Kafka_production_topics_list = Kafka_production_topics.split(",")Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")for i in range(0,len(Kafka_production_topics_list)):kafka_topics = Kafka_production_topics_list.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)kafka_offset[kafka_topics]=get_kafka_offset_numzoo_topics = Zoo_consumption_topics_list.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)zoo_offset[zoo_topics]= get_zoo_offset_numprint("Interval " + str(Interval_minutes) + " minutes sleep")print("=======================================================================================")time.sleep(int(Interval_minutes) * 60)for i in range(0,len(Kafka_production_topics_list_2)):kafka_topics = Kafka_production_topics_list_2.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)last_kafka_num = kafka_offset.get(kafka_topics)minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_numzoo_topics = Zoo_consumption_topics_list_2.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)last_zoo_num =  zoo_offset.get(zoo_topics)minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_numDifference = minutes_kafka_offset_num - minutes_zoo_offset_numprint("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))print("=======================================================================================")zk_cli.stop()# 关闭消费者连接kafka_server.close()

http://www.ppmy.cn/embedded/119977.html

相关文章

sql 时间交集

任务(取时间交集) 前端输入开始时间和结束时间,通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法: 前后一段时间内遇到了类似取交集的,从网上找到了两种写法,再结合GPT等…

无人化焦炉四大车系统 武汉正向科技 工业机车无人远程控制系统

焦炉四大车无人化系统介绍 采用格雷母线光编码尺双冗余定位技术,炉门视觉定位自学习技术,wifi5G无线通讯技术,激光雷达安全识别技术,焦化智慧调度,手机APP监控功能。 焦炉四大车无人化系统功能 该系统能自动生成生产…

27 Vue3之unocss原子化

前置知识 什么是原子化 CSS 原子化 CSS 是一种 CSS 的架构方式,它倾向于小巧且用途单一的 class,并且会以视觉效果进行命名。 为什么使用 原子化 CSS 传统方案 制作原子化 CSS 的传统方案其实就是提供所有你可能需要用到的 CSS 工具。例如&#xff0c…

uni-app - - - - -vue3使用i18n配置国际化语言

uni-app - - - - -使用i18n配置国际化语言 1. 安装vue-i18n2. 配置文件2.1 创建如下文件2.2 文件配置2.3 main文件导入i18n 3. 页面内使用3.1 template内直接使用3.2 变量接收使用 1. 安装vue-i18n npm install vue-i18n --save2. 配置文件 2.1 创建如下文件 locales文件夹里…

Vscode超好看的渐变主题插件

样式效果: 插件使用方法: 然后重启,之后会显示vccode损坏,不用理会,因为这个插件是更改了应用内部代码,直接不再显示即可。

构建高效房屋租赁系统:Spring Boot应用

1 绪论 1.1 研究背景 中国的科技的不断进步,计算机发展也慢慢的越来越成熟,人们对计算机也是越来越更加的依赖,科研、教育慢慢用于计算机进行管理。从第一台计算机的产生,到现在计算机已经发展到我们无法想象。给我们的生活改变很…

软考论文《论大数据处理架构及其应用》精选试读

论文真题 模型驱动架构设计是一种用于应用系统开发的软件设计方法,以模型构造、模型转换和精化为核心,提供了一套软件设计的指导规范。在模型驱动架构环境下,通过创建出机器可读和高度抽象的模型实现对不同问题域的描述,这些模型…

zy84_C#中文件的操作以及异常

文章目录 1.文件的操作2.文件的异常 1.文件的操作 File类的部分方法 静态方法通过类来调用,非静态方法通过类的对象来调用 string path "D:\test.txt"; if(File.Exists(path)) {string contents File.ReadAllText(path);//静态方法Console.WriteLine(&…