flink-1.16 table sql 消费 kafka 数据,指定时间戳位置消费数据报错:Invalid negative offset 问题解决

embedded/2024/12/27 10:54:23/

1 背景

1.使用 flink-1.16 的 table sql 消费 kafka数据,并使用 sql 计算指标,然后写入 doris;

2.指标计算时,需要统计当日数据条数,考虑到作业异常退出被重新拉起时,需要从零点开始消费,所以指定 'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳' 方式创建 kafka table:

s"""|CREATE TABLE qysfxjKafkaTable (|xlid STRING,|available_status STRING,|sendtime STRING,|`ts` TIMESTAMP(3) METADATA FROM 'timestamp'|) WITH (|'connector' = 'kafka',|'topic' = '${param.getProperty("qysfxjTopic")}',|'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳'|'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',|'properties.bootstrap.servers' = '${param.getProperty("brokers")}',|'properties.auto.offset.reset' = 'earliest',|'json.ignore-parse-errors' = 'true',|'json.fail-on-missing-field' = 'false',|'format' = 'json')|""".stripMargin

3.启动时报 kakfa 的错误,Invalid negative offset,即 flink 使用了一个不正确的 offset 到 kafka 消费数据,经过排查 topic 中最新一条数据的时间,在今日零点之前,也就是说,kafka table sql 中指定今日零点的时间戳,落后于 kafka 最新数据的时间;

2 解决方案

1.两种解决方案,① 从检查点启动作业;② 根据 kafka 数据时间,调整消费的时间;考虑到第一次启动可能 topic 也没有数据,且如果检查点失败会导致作业无法从检查点恢复的情况,决定采用 ② 方案解决;

2.方案步骤

1.使用 kafka java api,获取 topic 中最后一条数据,根据数据的时间戳初始化创建 kafka table sql 的启动时间;

2.获取到 kafka 最后一条数据的场景有两种:① kafka 中最新一条数据时间早于零点(报错的场景);② kafka 中最新一条数据时间晚于零点;

3.根据以上步骤,实现代码,代码会返回一个时间戳,0或者最后一条数据时间戳:

def getTopicLatestRecordTimeStamp(brokers: String,topic: String): Long ={var retMillis = 0Lval props = new Properties();props.put("bootstrap.servers", brokers);props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");val consumer = new KafkaConsumer[String,String](props);try {// 订阅 topicconsumer.subscribe(Collections.singletonList(topic))// 获取并订阅全部分区var assigment = consumer.assignment()while (assigment.size() == 0) {consumer.poll(Duration.ofMillis(1000L))assigment = consumer.assignment()}println(assigment)val earliestOffset = consumer.beginningOffsets(assigment)val lastOffset = consumer.endOffsets(assigment)println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)// 遍历所有分区,获取最新的 offsetval lastOffsetIter = lastOffset.entrySet().iterator()while (lastOffsetIter.hasNext){val ele = lastOffsetIter.next()if(ele.getValue != 0L){// 分区有数据consumer.seek(ele.getKey, ele.getValue - 1)val records = consumer.poll(1000).iterator()while (records.hasNext){val next = records.next()if(next.timestamp() > retMillis){retMillis = next.timestamp()}System.out.println("Timestamp of last record: " + next.timestamp())}}}retMillis} finally {consumer.close();}}

4.根据返回的时间戳,于当日零点判断,如果返回的时间戳早于零点,使用 latest-offset,返回的时间戳晚于当日零点,使用零点启动即可,以下代码返回使用的是时间戳启动,还是 latest-offset 启动:

if(parameterTool.get("qysfxjTopicStartFrom","latest").equals("latest")){val toAssignmentTime = TimeHandler.getMidNightMillions()val latestTime = KfkUtil.getTopicLatestRecordTimeStamp(pro.get("brokers").toString,pro.get("qysfxjTopic").toString)// 如果最后一条数据在 toAssignmentTime 之前,则使用 latest-offset 启动消费if(toAssignmentTime > latestTime){pro.put("qysfxjTopicStartFrom","latest-offset")}else {pro.put("qysfxjTopicStartFrom",(toAssignmentTime).toString)}}else {pro.put("qysfxjTopicStartFrom",parameterTool.get("qysfxjTopicStartFrom"))}

5.根据时间戳,还是 latest-offset,生成 sql 中的 scan 片段:

/*** 根据 startFrom,判断是从什么位置消费。** @param startFrom:earliest-offset,latest-offset,group-offsets,timestamp* @return*/def getKafkaSQLScanStr(startFrom: String): String = {var scanStartup = ""if(Character.isDigit(startFrom.trim()(0))){scanStartup ="'scan.startup.mode' = 'timestamp'," +s"'scan.startup.timestamp-millis' = '${startFrom.trim()}',"}else {scanStartup =s"'scan.startup.mode' = '${startFrom}',"}scanStartup}

6.完整table sql 拼接:

val qysfxjKafkaSource =s"""|CREATE TABLE qysfxjKafkaTable (|xlid STRING,|available_status STRING,|sendtime STRING,|`ts` TIMESTAMP(3) METADATA FROM 'timestamp'|) WITH (|'connector' = 'kafka',|'topic' = '${param.getProperty("qysfxjTopic")}',|${TXGJUtils.getKafkaSQLScanStr(qysfxjTopicStartFrom)}|'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',|'properties.bootstrap.servers' = '${param.getProperty("brokers")}',|'properties.auto.offset.reset' = 'earliest',|'json.ignore-parse-errors' = 'true',|'json.fail-on-missing-field' = 'false',|'format' = 'json')|""".stripMargin


http://www.ppmy.cn/embedded/149156.html

相关文章

虚幻引擎结构之UWorld

Uworld -> Ulevel ->Actors -> AActor 在虚幻引擎中,UWorld 类扮演着至关重要的角色,它就像是游戏世界的总指挥。作为游戏世界的核心容器,UWorld 包含了构成游戏体验的众多元素,从游戏实体到关卡设计,再到物…

ffmpeg源码分析(九)解协议

本文将聚焦于FFmpeg协议处理模块,以avformat_open_input函数为核心,详细剖析其在最新FFmpeg源码中的实现。 音视频处理流程简介 avformat_open_input概述 avformat_open_input是FFmpeg用于打开输入多媒体数据的关键函数。它通过统一的接口处理多种协议…

详细介绍Sd-WebUI提示词的语法规则

AI绘画中最大的门槛就是提示词,对英语水平、文学水平、想象力、灵感等要求较高。不能每次一输入正向提示词(positive prompt),就只会写a girl, big eyes, red hair。虽然sd-webui软件可以直接翻译,输入一个子母后会立刻…

重发布技术

重发布 在路由协议的边界设备上,将某一种路由协议的路由信息引入到另一种路由协议中,这个操作被称为路由引入或者路由重分发。----技术本质为重发布。 条件 必须存在ASBR设备(路由边界设备)---同时连接两种协议或两个进程&#…

Linux 下的 GPT 和 MBR 分区表详解

在Linux系统中,分区表是描述存储设备上分区布局和属性的一种结构化数据。常见的分区表格式有MBR(Master Boot Record)和GPT(GUID Partition Table)。以下是这两种分区表的详解: MBR(Master Boot…

二、创建第一个VUE项目

文章目录 一、先决条件二、创建项目1、创建项目2、选择VUE版本3、选择依赖管理器4、创建成功 三、启动并访问启动项目访问结果 一、先决条件 确保已经安装了 Node.js 和 npm(一般安装 Node.js 时会自动安装 npm),并且通过 npm install -g vue…

每天40分玩转Django:实操多语言博客

实操多语言博客 一、今日学习内容概述 学习模块重要程度主要内容国际化配置⭐⭐⭐⭐⭐基础设置、语言切换翻译模型⭐⭐⭐⭐⭐多语言字段、翻译管理视图处理⭐⭐⭐⭐多语言内容展示、URL处理前端实现⭐⭐⭐⭐语言切换、界面适配 二、模型设计 # models.py from django.db im…

Day 15:Spring 框架基础

目标 理解 Spring 核心概念,学习依赖注入(DI)和控制反转(IoC)的基本实现方式,并能够配置一个简单的 Spring 项目。 1. 什么是 Spring? Spring 是一个轻量级的开源框架,旨在解决企…