kafka常用命令大全

news/2025/1/15 21:52:35/

目录

启动kafka服务

停止kafka服务

创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)

列出指定主题(topic)的详细信息

查看所有的主题

查看所有主题的详细信息

删除一个主题

向kafka指定topic写入数据

命令行消费某个topic消息

查看某个topic对应的消息数量

kafka重置分组已经消费的偏移量offest

topic增加分区

指定topic创建消费者分组

查看消费组组所属topic的消费情况

显示所有消费者

获取正在消费的topic的group的offset

重设 consumer group的offset

指定offset与partition导出消息

修改topic的参数

测试生产者性能脚本

测试消费者性能脚本


启动kafka服务

bin/kafka-server-start.sh config/server.properties &

停止kafka服务

./kafka-server-stop.sh

创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)

./kafka-topics.sh --zookeeper(host:port) --create --topic demo-topic --replication-factor 3 --partitions 2 --config retention.ms=259200000

列出指定主题(topic)的详细信息

./kafka-topics.sh  --zookeeper(host:port) --describe  --topic demo-topic 

查看所有的主题

./kafka-topics.sh --list --zookeeper(host:port) kafka-host(host:port)

查看所有主题的详细信息

./kafka-topics.sh --zookeeper(host:port) --describe

删除一个主题

./kafka-topics.sh --zookeeper(host:port) --topic demo-topic --delete 

向kafka指定topic写入数据

./kafka-console-producer.sh --broker-list kafka-host(host:port)--topic demo-topic

命令行消费某个topic消息

#加了--from-beginning 从头消费所有消息
./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic --from-beginning   
#不加--from-beginning 从最新的一条消息开始消费
./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port) --topic demo-topic 

查看某个topic对应的消息数量

# time为-1时表示最大值,time为-2时表示最小值 --partitions num 指定分区
./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list kafka-host(host:port) --topic demo-topic --time -1实例:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic app-aaaa

kafka重置分组已经消费的偏移量offest

./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port) --execute --reset-offsets --topic=demo-topic --group=testPlatform --to-earliest

topic增加分区

./kafka-topics.sh --alter --zookeeper(host:port) --topic demo-topic --partitions 12

指定topic创建消费者分组

./kafka-console-consumer.sh  --bootstrap-server=kafka-host(host:port) --topic demo-topic --consumer-property group.id=testPlatform

查看消费组组所属topic的消费情况

./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port)  --group=demo-group --describ

显示所有消费者

./kafka-consumer-groups.sh --bootstrap-serverkafka-host(host:port) --list

获取正在消费的topic的group的offset

./kafka-consumer-groups.sh --describe --group demo-group --bootstrap-serverkafka-host(host:port)

重设 consumer group的offset

确定topic作用域:--all-topics   为consumer group下所有topic的所有分区调整位移--topic t1 --topic t2  为指定的若干个topic的所有分区调整位移--topic t1:0,1,2     为指定的topic分区调整位移确定位移重设策略
--to-current                            把位移调整到分区当前位移.       
--to-datetime <String: datetime>       把位移调整到大于给定时间的最早位移处.datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'  2020-07-01T12:00:00.000
--to-earliest                           把位移调整到分区当前最小位移
--to-latest                             把位移调整到分区当前最新位移        
--to-offset <Long: offset>              把位移调整到指定位移处
--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略
例:(--dry-run 不运行只查看结果,类似k8s里面;--execute执行)
按时间点重置(--to-datetime)
./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform  --execute --reset-offsets --to-datetime 2022-01-18T12:00:00.000 按offset重置 (--to-offset )
重置消费组下指定topic
./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-offset 359905139  重置消费组下面所以topic
./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --all-topics --group testPlatform --reset-offsets  --to-latest  --execute

指定offset与partition导出消息

./kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port) --topic --topic=demo-topic  --offset 825000 --partition 0 >> messages.log

修改topic的参数

kafka-configs.sh --zookeeper(host:port) --entity-type topics --entity-name demo-topic --alter --add-config max.message.bytes=1048576

测试生产者性能脚本

./kafka-producer-perf-test.sh --topic demo-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host(host:port) acks=-1 linger.ms=2000 compression.type=lz4

测试消费者性能脚本

./kafka-consumer-perf-test.sh --broker-list kafka-host(host:port) --messages 10000000 --topic demo-topic

http://www.ppmy.cn/news/12877.html

相关文章

C进阶_内存库函数

目录 memcpy 模拟实现memcpy memmove 模拟实现memmove memcmp memcpy 它的函数原型为&#xff1a; void * memcpy ( void * destination, const void * source, size_t num ); 函数memcpy从source的位置开始向后复制num个字节的数据到destination的内存位置。 这个函数…

【树莓派4B】搭建HomeAssistant服务端(二)(systemd配置开机自启动,cpolar内网穿透)

设置开机自启动 创建home-assistanthomeassistant.service服务&#xff1a; sudo nano /etc/systemd/system/home-assistanthomeassistant.service复制以下内容&#xff0c;定义服务&#xff0c;其中After定义先行服务&#xff0c;ExecStart执行启动脚本&#xff1a; [Unit]…

HTB打靶(Active Directory 101 Resolute)

nmap扫描 nmap -A -T4 10.10.10.169 Starting Nmap 7.93 ( https://nmap.org ) at 2023-01-16 01:30 EST Stats: 0:00:04 elapsed; 0 hosts completed (1 up), 1 undergoing SYN Stealth Scan SYN Stealth Scan Timing: About 74.65% done; ETC: 01:30 (0:00:01 remaining) St…

Linux三剑客之Sed

目录 一、认识sed 二、使用sed 命令格式 常用选项options 地址定界 编辑命令command sed用法 常用选项&#xff1a; 地址界定演示 编辑命令command演示 sed高级编辑命令 一、认识sed sed 是一种流编辑器&#xff0c;它一次处理一行内容。处理时&#xff0c;把当前处理的行…

ConcurrentHashMap 的优化及其与HashTable, HashMap的区别

目录 1.优化一:减小锁粒度 2.优化二:只针对写操作加锁 3.优化三:CAS 4.优化四:扩容方式 HashMap是线程不安全的,HashTable是线程安全的,关键方法加锁了.我们更推荐的是ConcurrentHashMap ,更优化的线程安全哈希表 接下来我们总结一下ConcurrentHashMap 进行了哪些优化,比H…

应用层——Web和HTTP

目录 1. HTTP概况 1.1 Web页面简介 1.2 URL-统一资源定位器 1.3 HTTP协议 2. HTTP连接的两种类型 2.1 HTTP非持久性连接(Non-persistent HTTP) 2.2 HTTP持久性连接(Persistent HTTP) 2.2.1 无流水(pipelining)的持久性连接 2.2.2 带有流水机制的持久性连接 3. HT…

WebDAV之葫芦儿·派盘+BookxNote

BookxNote 支持WebDAV方式连接葫芦儿派盘。 BookxNote是全新设计的电子书阅读学习笔记软件,以不同的思维方式重塑我们的学习过程。一边阅读一边划重点,提供多种划重点笔记工具,包括直线、矩形、圆形、高亮文本,图片摘录。高亮的文本自动编辑为重点的批注内容,还可以对重点…

1月17日,30秒知全网,精选7个热点

///快手与央视达成合作&#xff0c;可通过直播、点播和短视频三种方式看春晚 据官方介绍&#xff0c;这已经是快手连续第五年与总台春晚进行深度合作。此外&#xff0c;今年快手也与17家省市电视台达成了区域春晚转播合作&#xff0c;为用户奉上春晚视听盛宴。 ///俄罗斯或单方…