Kafka使用脚本根据时间重置消费位移,格式你写对了么?

news/2024/10/18 11:05:12/

前言

kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置策略如下(引用自官方文档):

--reset-offsets also has following scenarios to choose from (at least one scenario must be selected):

  • --to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : Reset offsets to earliest offset.
  • --to-latest : Reset offsets to latest offset.
  • --shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
  • --from-file : Reset offsets to values defined in CSV file.
  • --to-current : Resets offsets to current offset.
  • --by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
  • --to-offset : Reset offsets to a specific offset.

就是可以根据时间重置、重置到最小位移、最大位移...等场景。

本文主要聊一下根据时间重置消费位点时候,这个时间格式的问题。

根据时间重置消费位移

示例命令:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000 \--execute  

为了看起来直观点,我加了"\"换行展示。

示例命令重置消费组为test_topic_consumer订阅test_topic的消费偏移为2021年11月29号中午12点整的时候消息位点。

实际上,结果不一定是这样,和时区有关。

协调世界时(UTC)

关于utc,查看维基百科

kafka脚本采用的是utc时间标准,与北京时间换算如下:

点击查看:图片来源

北京为东8时区,采用的是utc+08:00,这一段可以看维基百科,下面我复制出一部分,可以了解一下:

UTC+08:00是比世界协调时间快8小时的时区,理论上的位置在东经112度30分127度30分之间,是东盟标准时间的候选时区之一,居住在本时区的人数约有17亿人,占全世界人口的24%,是全世界人口最多的时区。

该时区亦为包括台湾、新加坡、马来西亚、中国、文莱、印尼中部及澳大利亚西部在内的绝大多数汉语使用者所居住的时区。所以互联网上的不少中文网站会使用该时区标记时间,而不论该网站所在地的官方时区为何。

所以,如果你现在使用的是北京时间,如果按示例重置位点,则实际上不是重置到12:00,而且是重置到了20:00的消息位点。

北京时间重置写法

如果是北京时间,则命令应该如下:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 \--execute  

即时间格式为:2021-11-29T12:00:00.000+08:00 ,表示重置到11月29号的12点。当然这里还有其它写法,下面是源码注释中支持的写法:

(1) yyyy-MM-dd'T'HH:mm:ss.SSS, ex: 2020-11-10T16:51:38.198
(2) yyyy-MM-dd'T'HH:mm:ss.SSSZ, ex: 2020-11-10T16:51:38.198+0800
(3) yyyy-MM-dd'T'HH:mm:ss.SSSX, ex: 2020-11-10T16:51:38.198+08
(4) yyyy-MM-dd'T'HH:mm:ss.SSSXX, ex: 2020-11-10T16:51:38.198+0800
(5) yyyy-MM-dd'T'HH:mm:ss.SSSXXX, ex: 2020-11-10T16:51:38.198+08:00

重置流程

kafka根据时间重置消费位点这一块逻辑也是相当简单:

  1. 获取指定topic的分区(也可以是所有topic)
  2. 将时间转换为对应的时间戳,此时转换的时候就是上面提到的时区问题
  3. 根据时间戳获取对应的消息位点
  4. 修改消费位点为对应的消息位点

下面这段代码是根据时间戳查询位点的逻辑:

    private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {val timestampOffsets = topicPartitions.map { topicPartition =>// 指定根据时间戳类型查询位点,除此之外还有最小和最大日志位点等topicPartition -> OffsetSpec.forTimestamp(timestamp)}.toMap// 查询消息位点val offsets = adminClient.listOffsets(timestampOffsets.asJava,withTimeoutMs(new ListOffsetsOptions)).all.get//如果时间戳超过当前最新的消息时间了,就是查不到了,就是未知,下面会把未知这种转换为最新的消费位点val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)}.toMapunsuccessfulOffsetsForTimes.foreach { entry =>println(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +" is empty. Falling back to latest known offset.")}// 将查询到的和未知这种转换为最新的日志位点一起返回,准备重置successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)}

可视化重置

如果觉得重置命令太麻烦,推荐一款可视化控制台:kafka-console-ui,github地址:https://github.com/xxd763795151/kafka-console-ui

新手用这个学习还是比较友好的:


http://www.ppmy.cn/news/689736.html

相关文章

全球股市指数缩写及开盘时间

名称 简写 开盘时间1 开盘时间2 开盘时间3 开盘时间4 恒生指数 HSI.HI 9:30 12:00 13:30 16:00 台湾加权指数 TWII.TW 9:00 13:30 东京日经225指数 N225.GI 9:00 15:15 道琼斯工业平均指数 DJI.GI 9:30 16:15 纳斯达克综合指数 IXI…

北京工业大学2019年第八届暑期科技夏令营全记录 (计算机专业)

结束了为期三天的夏令营&#xff0c;想写个总结纪念一下这三天的经历。 0、本次夏令营总日程安排 5月30日至6月15日 —— 网上报名 6月28日 —— 网上查询入营结果&#xff0c;并确认 7月14日 —— 9:00-20:00 报道 7月15日 —— 上午&#xff1a;校开营仪式&#xff1b;下…

常用NTP网络时间服务器整理

以下所有的域名都是通过host -t A domain 114.114.114.114解析出来的结果。 ntp1.aliyun.com has address 182.92.12.11 北京市大兴区 阿里云ntp2.aliyun.com has address 120.25.115.19 广东省深圳市 阿里云ntp3.aliyun.com is an alias for ntp.aliyun.com.ntp4.aliyun.com i…

JS时间戳和时间格式转换的几种方法

JS时间戳和时间格式转换的几种方法 时间戳转化为日期的方式 var timestamp 1527521052;var newDate new Date();newDate.setTime(timestamp * 1000);// Mon May 28 2018 console.log(newDate.toDateString());// Mon, 28 May 2018 15:24:12 GMT console.log(newDate.toGMTS…

世界城市时间计算

1.城市对应的时区表 地拉那Europe/Tirane 图古尔特Africa/Algiers 都柏林Europe/Dublin 塔林Europe/Tallinn 库雷萨雷Europe/Tallinn 安道尔Europe/Andorra 萨尔茨堡Europe/Vienna 维也纳Europe/Paris 因斯布鲁克Europe/Vienna 布雷根茨Europe/Vienna 格蒙登Europe/Vienna 米尔…

python 、mysql 时间戳 时间 字符串之间的转化

python from datetime import datetime,timedelta from dateutil import tz #时间戳 改时间# Kolkata 印度 # Jakarta 印尼 tz_nation tz.gettz(Asia/Jakarta) def timestamp_tz(time_stamp,tz_nationtz_nation):return datetime.fromtimestamp(time_stamp,tztz_nation) if …

TikTok(国际版抖音)时间线

2012年3月 字节跳动在北京成立 2015年8月 开始全球化布局 2016年9月 抖音短视频上线 2017年5月 TikTok上线 2018年12月 苹果公司“2018年度精选榜单&#xff08;Best of 2018&#xff09;”出炉&#xff0c;TikTok超过LINE、Google Maps等成为日本App Store…

Hive时间函数from_utc_timestamp 把UTC标准时间切换到北京时间

首先简单地解释下几个含义&#xff1a; 1. 时间戳(unix timestamp) 表示以(1970-01-01 00:00:00)为起点&#xff0c;到现在的秒数。 2. GMT和UTC 先说结论&#xff0c;UTC与GMT基本上等同&#xff0c;误差不超过0.9秒。 GMT&#xff0c;即格林尼治标准时间&#xff0c;也就…