目录
- state
- 容错语义
- at most once
- at least once
- exactly-once
- 容错续跑
- 算子容错续跑
- task异常作业逻辑不变CP/SP策略
- bugfix升级续跑SP策略
- source容错续跑
- sink 容错续跑
state
flink 支持有状态的流,存储历史的状态信息。
状态
状态分类
keystate keyBy/groupBy/PartitonBy 后,每个key都有属于自己的一个state
- liststate
- valuestate
- mapstate
operatorstate-flink source connector 的实现会用operatorState 来记录source 数据读取的offset
- broadcaststate
可以通过读取外部数据,进行广播变量,转化成广播流进行和一般流数据进行关联 - liststate
- unionliststate
三种状态存储方式机使用场景
状态后端-传入一条数据,有状态的算子任务都会读取和更新状态
状态后端作用-管理本地状态,将状态写入远程存储。
- memoryStateBackend
状态存储在TaskManager中,执行checkpoint 的时候会将状态保存在jobManager 中。 - FsStateBackend
状态会在taskManager内存中,自己维护一份。当checkpoint 时,转存到外部分布式存储系统。本地内存访问速度快,容易OMM - RocksDBStateBackend
状态会在RocksDB中进行存储,解决内存限制问题,checkpoint 会状态存储在远端分布式存储。
容错语义
源头是多个数据流,同步的增加barrier ,同时在job处理的过程中,为了保证job失败的额时候可以从错误中恢复,flink 还对barrier进行align 对其操作
snapshots 全局一致性镜像,快照的内容是state,比如source offset 或者 算子累加的结果。
at most once
no replay resource
at least once
replay resource
exactly-once
checpointed replay source
flink 内部的exactly-onece
exactlyOnce 不是数据只处理一次,而是数据只影响数据结果一次
barrier对齐就是exactly once ,barrier不对齐就是at least once
barrier从source Task处生成,一直流到Sink Task,期间所有的Task只要碰到barrier,就会触发自身进行快照
flink 容错的核心时barrier alignment(组标记栏)
设置方式:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
容错续跑
算子容错续跑
- 只设置重启策略,不设置checkpoint
state不会保留,每次重启都会重新计算 - 设置重启策略,设置checkpoint方式
state 会保留在checkpoint中,计算结果包含以前的结果。默认state存储在jobmanager内存中。 - 设置重启策略,设置checkpoint,设置存储后端是fs。
state 存储在后端fs,作业停止后,默认会删除checkpoint。 - 设置重启策略,设置checkpoint,设置存储后端是fs,开启保存cp.
支持继续跑
senv.enableCheckpointing(1000);
senv.setStateBackend(new FsStateBackend("file:///D:\\DATA"));
senv.ssetRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1,TimeUnit.MILLISECONDS) ));
senv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
task异常作业逻辑不变CP/SP策略
作业恢复续跑命令
- 从checkpoint 恢复
bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\chk-10
控制台页面提交submit job支持恢复 - 从savepoint 恢复
1.停止作业savepoint
bin/flink cancle -s 作业id
创建savepoint 并进行停止作业
2.从savepoint 恢复
bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd
bugfix升级续跑SP策略
作业拓扑变化的升级续跑
- 作业不能启动
bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd - 作业启动,但是state没恢复
bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState
添加参数allowNonRestoredState,作业能过成功,但是state的关系没有恢复。 - 作业能启动,state也能恢复
bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState
添加uid逻辑,统一的算子和state文件映射关系
source容错续跑
source外部数据源支持提供数据重放,例如kafka,可以提供的postion进行数据拉取。
要实现CheckpointedFunction接口,实现保存kafka的offset ,实现数据源续跑
snapshotState,initializeState 方法
# taskid 和offset 续跑不对应
getListState list结构,只存offset
# taskid 和offset 续跑相对应
getUnionListState,是map结构,存储<taskid,offset>
#offset的时候要使用SourceContext.getCheckpointLock来进行同步操作,保证确定性一次语义。synchronized (ctx.getCheckpointLock()) {ctx.collect(new Tuple3<>("key", ++offset, String.valueOf(System.currentTimeMillis())));}
sink 容错续跑
两阶段提交,sink operator 需要感知整个checkpoint 的完成,并在完成后才将数据存储。
数据满足两个要求
- 下游组件对事物的支持
- 通过flink两阶段提交协议和预提交阶段,确保提交或回滚一致性的问题
步骤
# cp 完成之前结果写入临时存储
#上一个cp之后生成当前cp的事务ID
#cp将当前事务进行持久化
#cp完成后,将当前事务对应的结果数据写入正式外部系统