flink状态和容错(三)

news/2024/11/9 2:00:30/

目录

  • state
  • 容错语义
    • at most once
    • at least once
    • exactly-once
  • 容错续跑
    • 算子容错续跑
      • task异常作业逻辑不变CP/SP策略
      • bugfix升级续跑SP策略
    • source容错续跑
    • sink 容错续跑

state


flink 支持有状态的流,存储历史的状态信息。
状态

状态分类
keystate keyBy/groupBy/PartitonBy 后,每个key都有属于自己的一个state

  • liststate
  • valuestate
  • mapstate

operatorstate-flink source connector 的实现会用operatorState 来记录source 数据读取的offset

  • broadcaststate
    可以通过读取外部数据,进行广播变量,转化成广播流进行和一般流数据进行关联
  • liststate
  • unionliststate
    三种状态存储方式机使用场景
    状态后端-传入一条数据,有状态的算子任务都会读取和更新状态
    状态后端作用-管理本地状态,将状态写入远程存储。
    在这里插入图片描述
  • memoryStateBackend
    状态存储在TaskManager中,执行checkpoint 的时候会将状态保存在jobManager 中。
  • FsStateBackend
    状态会在taskManager内存中,自己维护一份。当checkpoint 时,转存到外部分布式存储系统。本地内存访问速度快,容易OMM
  • RocksDBStateBackend
    状态会在RocksDB中进行存储,解决内存限制问题,checkpoint 会状态存储在远端分布式存储。

容错语义


源头是多个数据流,同步的增加barrier ,同时在job处理的过程中,为了保证job失败的额时候可以从错误中恢复,flink 还对barrier进行align 对其操作
snapshots 全局一致性镜像,快照的内容是state,比如source offset 或者 算子累加的结果。

at most once

no replay resource

at least once

replay resource

exactly-once

checpointed replay source
flink 内部的exactly-onece
exactlyOnce 不是数据只处理一次,而是数据只影响数据结果一次

barrier对齐就是exactly once ,barrier不对齐就是at least once
barrier从source Task处生成,一直流到Sink Task,期间所有的Task只要碰到barrier,就会触发自身进行快照
flink 容错的核心时barrier alignment(组标记栏)
设置方式:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

容错续跑


算子容错续跑

  • 只设置重启策略,不设置checkpoint
    state不会保留,每次重启都会重新计算
  • 设置重启策略,设置checkpoint方式
    state 会保留在checkpoint中,计算结果包含以前的结果。默认state存储在jobmanager内存中。
  • 设置重启策略,设置checkpoint,设置存储后端是fs。
    state 存储在后端fs,作业停止后,默认会删除checkpoint。
  • 设置重启策略,设置checkpoint,设置存储后端是fs,开启保存cp.
    支持继续跑
senv.enableCheckpointing(1000);
senv.setStateBackend(new FsStateBackend("file:///D:\\DATA"));
senv.ssetRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1,TimeUnit.MILLISECONDS) ));
senv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

task异常作业逻辑不变CP/SP策略

作业恢复续跑命令

  • 从checkpoint 恢复
    bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\chk-10
    控制台页面提交submit job支持恢复
  • 从savepoint 恢复
    1.停止作业savepoint
    bin/flink cancle -s 作业id
    创建savepoint 并进行停止作业
    2.从savepoint 恢复
    bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd

bugfix升级续跑SP策略

作业拓扑变化的升级续跑

  • 作业不能启动
    bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd
  • 作业启动,但是state没恢复
    bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState
    添加参数allowNonRestoredState,作业能过成功,但是state的关系没有恢复。
  • 作业能启动,state也能恢复
    bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState
    添加uid逻辑,统一的算子和state文件映射关系

source容错续跑

source外部数据源支持提供数据重放,例如kafka,可以提供的postion进行数据拉取。
要实现CheckpointedFunction接口,实现保存kafka的offset ,实现数据源续跑
snapshotState,initializeState 方法

# taskid 和offset 续跑不对应
getListState list结构,只存offset
# taskid 和offset 续跑相对应
getUnionListState,是map结构,存储<taskid,offset>
#offset的时候要使用SourceContext.getCheckpointLock来进行同步操作,保证确定性一次语义。synchronized (ctx.getCheckpointLock()) {ctx.collect(new Tuple3<>("key", ++offset, String.valueOf(System.currentTimeMillis())));}

sink 容错续跑

两阶段提交,sink operator 需要感知整个checkpoint 的完成,并在完成后才将数据存储。
数据满足两个要求

  • 下游组件对事物的支持
  • 通过flink两阶段提交协议和预提交阶段,确保提交或回滚一致性的问题

在这里插入图片描述

步骤
# cp 完成之前结果写入临时存储
#上一个cp之后生成当前cp的事务ID
#cp将当前事务进行持久化
#cp完成后,将当前事务对应的结果数据写入正式外部系统

http://www.ppmy.cn/news/291288.html

相关文章

anyview+4.0java版_Anyview v4.0.12[带证书]

软件简介 Anyview可以阅读任意TXT文件,包括UNICODE、UTF-8、GB2312、HTML文件、PDB文件、UMD文件。 对于TXT文件,测试过8M的文件,处理速度可以接受。 Anyview支持滚屏,可以自己调整滚屏的速度;在阅读时,可以切换到全屏方式。 每5分钟,Anyview会为您保存一次阅读历史 有多…

老java手机有什么什么用,80后熟悉的这些手机功能,用过3种以上说明你已经老了...

原标题&#xff1a;80后熟悉的这些手机功能&#xff0c;用过3种以上说明你已经老了 又是一年元旦时&#xff0c;每到这时怀旧的心情便一发不可收拾。朋友圈等晒18岁照片成了热潮&#xff0c;可怜亓纪18岁的时候还没有手机。只有怀念一下那些年少轻狂时代用过的手机功能。 红外接…

获取指定文件夹下所有的同种类型文档的名字(Python)

今天教大家如何在一个非常混乱文件夹下用Python编写一个程序来寻找所有自己所需类型的文件&#xff08;这次举例选择json文件&#xff0c;如果有寻找其他类型文件需要可下面程序json改为自己所需的文件名字&#xff09;&#xff0c;并且输出所有所需文件的名字。 首先我们需要…

第七届湖湘杯网络安全大赛 - 初赛writeup

Web easywill 解题思路 变量覆盖 http://eci-2zej1goyn9jh8hty6ton.cloudeci1.ichunqiu.com/?namecfile&value/etc/passwd P神博客最近的文章利用pearcmd&#xff1a;https://tttang.com/archive/1312/ Pentest in Autumn 解题思路http://eci-2ze40jm526y24nv2lkl3.cloude…

Cookie(Secure和HttpOnly属性) JSESSIONID 刷新

目录 一&#xff1a;Cookie&#xff08;Secure和HttpOnly属性&#xff09; 二&#xff1a;JSESSIONID是什么 三&#xff1a; JSESSIONID 刷新 一&#xff1a;Cookie&#xff08;Secure和HttpOnly属性&#xff09; 基于安全的考虑&#xff0c;需要给cookie加上Secure和HttpOn…

【求助】 有个问题 c语言单词替换问题

replace oldfile newfile oldword newword<回车> 命令如上 把旧文件中的旧单词替换成新单词 然后输入到新文件中去。我的问题是程序会一直往新文件里面打印字符串不会停止。 代码如下&#xff0c;希望有大大可以帮我解答一下。谢谢啦&#xff01; 1 #include <stdio…

什么是JAVA软件?

一般的适用于S40的JAVA软件都可以装在诺基亚3110c上,包括游戏,电子书,应用软件 什么是JAVA软件呢&#xff1f;Java是由Sun微系统公司所发展出来的程序语言&#xff0c;它本身是一种对象导向&#xff08;Object-Oriented&#xff09;的程序语言。JAVA目前在手机上应用最多的就是…

[我可怜的3110c!nbsp;续]

今天早上&#xff0c;我带着我的手机来到裕华道修手机。 上午7:50&#xff0c;到裕华道口下车。 8:00&#xff0c;有点饿&#xff0c;就上小吃店叫了4块钱的包子和一瓶水。 8:15&#xff0c;裕华道75-77的那家售后服务中心说上午九点开始修&#xff0c;无奈&#xff0c;遛&…