Flink 故障恢复和重启策略

自动故障恢复是 Flink 提供的一个强大的功能,在实际运行环境中,我们会遇到各种各样的问题从而导致应用挂掉,比如我们经常遇到的非法数据、网络抖动等。Flink 提供了强大的可配置故障恢复和重启策略来进行自动恢复。

故障恢复

Flink 的配置文件,其中有一个参数 jobmanager.execution.failover-strategy: region。


Flink 支持了不同级别的故障恢复策略,jobmanager.execution.failover-strategy 的可配置项有两种:full 和 region。


当我们配置的故障恢复策略为 full 时,集群中的 Task 发生故障,那么该任务的所有 Task 都会发生重启。而在实际生产环境中,我们的大作业可能有几百个 Task,出现一次异常如果进行整个任务重启,那么经常会导致长时间任务不能正常工作,导致数据延迟。


根据图论知识,如果我们的ExecutionGraph是一个非连通图(即可以划分为多个独立的依赖pipeline),那么当某个Task失败时,就可以只回溯到该Task所在的连通分量的Source,并重启该连通分量涉及到的所有Task,而其他Task不受影响,如下图所示。此时一个连通分量就是一个Region。

这个思路很容易理解,但是对于ExecutionGraph本身就是连通图的情况就不高效了,因为还是要重启所有Task,如下图所示。

所以Flink对这种情况又做了一个优化:在发生一对多依赖的Task后面缓存计算出来的中间结果(intermediate result)。当下游的Task失败重启时,就可以不必回溯到Source,而是回溯到中间结果就行了,重启的Task数进一步减少。此时从中间结果缓存起计的所有下游Task形成一个Region。用语言描述可能有些不直观,一张图就能说明白了。

注意B1、B2后面的黑框框
当然,如果是靠近Source一端的Task出了问题,或者中间结果缓存失效,这种方法就行不通了,老老实实从Source重启吧。


所以集群中某一个或几个 Task 发生了故障,只需要重启有问题的一部分即可,这就是 Flink 基于 Region 的局部重启策略。在这个策略下,Flink 会把我们的任务分成不同的 Region,当某一个 Task 发生故障时,Flink 会计算需要故障恢复的最小 Region。


Flink 在判断需要重启的 Region 时,采用了以下的判断逻辑:

  • 发生错误的 Task 所在的 Region 需要重启;
  • 如果当前 Region 的依赖数据出现损坏或者部分丢失,那么生产数据的 Region 也需要重启;
  • 为了保证数据一致性,当前 Region 的下游 Region 也需要重启。


Job重启策略的相关源码
截屏2020-10-08 下午9.54.50.png截屏2020-10-08 下午9.55.00.png截屏2020-10-08 下午9.55.10.png
Task重启策略的相关源码
截屏2020-10-08 下午9.57.30.png

重启策略

Flink 提供了多种类型和级别的重启策略,常用的重启策略包括:

  • 固定延迟重启策略模式
  • 失败率重启策略模式
  • 无重启策略模式


Flink 在判断使用的哪种重启策略时做了默认约定,如果用户配置了 checkpoint,但没有设置重启策略,那么会按照固定延迟重启策略模式进行重启如果用户没有配置 checkpoint,那么默认不会重启。


下面我们分别对这三种模式进行详细讲解。
**

无重启策略模式

在这种情况下,如果我们的作业发生错误,任务会直接退出。
我们可以在 flink-conf.yaml 中配置:

1
restart-strategy: none


也可以在程序中使用代码指定:

1
2
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

**

固定延迟重启策略模式

固定延迟重启策略会通过在 flink-conf.yaml 中设置如下配置参数,来启用此策略:

1
restart-strategy: fixed-delay


固定延迟重启策略模式需要指定两个参数,首先 Flink 会根据用户配置的重试次数进行重试,每次重试之间根据配置的时间间隔进行重试,如下表所示:

举个例子,假如我们需要任务重试 3 次,每次重试间隔 5 秒,那么需要进行一下配置:

1
2
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 5 s


当前我们也可以在代码中进行设置:

1
2
3
4
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(5, TimeUnit.SECONDS) // 时间间隔
));

失败率重启策略模式

首先我们在 flink-conf.yaml 中指定如下配置:

1
restart-strategy: failure-rate


这种重启模式需要指定三个参数,如下表所示。失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

这种策略的配置理解较为困难,我们举个例子,假如 5 分钟内若失败了 3 次,则认为该任务失败,每次失败的重试间隔为 5 秒。


那么我们的配置应该是:

1
2
3
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 5 s


当然,也可以在代码中直接指定:

1
2
3
4
5
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(5, TimeUnit.SECONDS) // 每次任务失败时间间隔
));


最后,需要注意的是,在实际生产环境中由于每个任务的负载和资源消耗不一样,我们推荐在代码中指定每个任务的重试机制和重启策略