title: Flink系列
八、Flink RestartStrategy 重启策略 和 FailoverStrategy 故障转移策略
官网链接:
重启策略链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/#restart-strategies
故障转移链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/#restart-all-failover-strategy
8.1 重启策略概述
Flink 支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。
配置参数 RestartStrategyOptions.restart-strategy 定义了哪个策略被使用。
常用的重启策略
-
固定间隔 (Fixed delay)
-
指数间隔 (Exponential delay)
-
失败率 (Failure rate)
-
无重启 (No restart)
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。 如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在 flink-conf.yaml 中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置。
8.2 重启策略详解
8.2.1 固定间隔 (Fixed delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二种:应用代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(// number of restart attempts3,Time.of(10, TimeUnit.SECONDS) // delay
));
8.2.2 指数间隔 (Exponential delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
第二种:应用代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(Time.milliseconds(1),Time.milliseconds(1000),// exponential multiplier1.1, // threshold duration to reset delay to its initial valueTime.milliseconds(2000), 0.1 // jitter
));
8.2.3 失败率 (Failure rate)
第一种:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二种:应用代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // max failures per intervalTime.of(5, TimeUnit.MINUTES), //time interval for measuring failure rateTime.of(10, TimeUnit.SECONDS) // delay
));
8.2.4 无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none
第二种:应用代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
8.3 FailoverStrategy 故障转移策略
当 Task 发生错误,TaskManager 会通过 RPC 通知 JobManager,后者将对应 Execution 的状态转为 failed 并触发 Failover 策略
如果符合 Failover 策略,JobManager 会重启 Execution,否则升级为 ExecutionGraph 的失败。ExecutionGraph 失败则进入 failing 的状态,由 Restart 策略决定其重启( restarting 状态)还是异常退出( failed 状态)。
FailoverStrategy 有两种实现:
-
Restart All Failover Strategy,代号 full,表示 Application 的 Task 出现异常,则直接全部 Task 重启
-
Restart Pipelined Region Failover Strategy,代号 region,这是默认实现,如果一个 Task 出现异常,则重启最小代价的 Region 集合
关于这种策略,跟源码中 Task 的部署策略有关系。
通过 JobManagerOptions.EXECUTION_FAILOVER_STRATEGY = jobmanager.execution.failover-strategy 来配置
Flink 在判断需要重启的 Region 时,采用了以下的判断逻辑:
官网原版:
The regions to restart are decided as below:
1.The region containing the failed task will be restarted.
2.If a result partition is not available while it is required by a region that will be restarted, the region producing the result partition will be restarted as well.
3.If a region is to be restarted, all of its consumer regions will also be restarted. This is to guarantee data consistency because nondeterministic processing or partitioning can result in different partitions.
近似的翻译出来更容易理解的内容如下:
1、发生错误的 Task 所在的 Region 需要重启;
2、如果当前 Region 的依赖数据出现损坏或者部分丢失,那么生产数据的 Region 也需要重启;
3、为了保证数据一致性,当前 Region 的下游 Region 也需要重启。
重启策略:Job 级别的容错
故障转移策略: Task 级别的容错
维度 | 粗粒度 | 细粒度 |
---|---|---|
容错 | Job 级别容错:Restart 重启策略 | Task 级别容错:Failover 策略 |
State状态 | Job Checkpoint | Task State |
声明:
文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。
By luoyepiaoxue2014
B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接