介绍
Apache Flink提供了一种容错机制,可以一致地恢复数据流应用程序的状态。该机制确保即使在出现故障的情况下,程序的状态最终也将exactly once反映出数据流中的每个记录。请注意,有一个开关可以至少_将担保_降级一次
容错机制连续绘制分布式流数据流的快照。对于状态小的流应用程序,这些快照非常轻巧,可以在不影响性能的情况下频繁绘制。流应用程序的状态存储在可配置的位置(例如主节点或HDFS)。
如果发生程序故障(由于机器,网络或软件故障),Flink将停止分布式流数据流。然后,系统重新启动operators,并将其重置为最新的成功检查点。输入流将重置为状态快照的点。确保作为重新启动的并行数据流的一部分处理的任何记录都不属于先前的检查点状态。
_注意:_默认情况下,检查点是禁用的。有关如何启用和配置检查点的详细信息,请参见检查点。
_注意:_为了使该机制实现其全部保证,数据源(例如消息队列或代理)必须能够将流后退到定义的最近点。Apache Kafka具有此功能(kafka 0.11版本后支持),Flink与Kafka的连接器利用了此功能。有关Flink连接器提供的保证的更多信息,请参见数据源和接收器的容错保证。
_注意:_由于Flink的checkpoints是通过分布式快照实现的,因此我们可以交替使用_snapshot_和checkpoint一词。
Checkpoint
Flink容错机制的核心部分是绘制分布式数据流和operator state的一致快照。这些快照充当一致的检查点,如果发生故障,系统可以回退到这些检查点。Flink绘制这些快照的机制在“ 分布式数据流的轻量级异步快照 ”中进行了介绍。它受用于分布式快照的标准Chandy-Lamport算法的启发,并且专门针对Flink的执行模型进行了量身定制。
Barriers
流屏障(barriers)是Flink分布式快照中的核心元素。这些barriers将注入到数据流中,并与记录一起作为数据流的一部分流动。壁垒从不超越记录,它们严格按照顺序排列。屏障将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录集。每个屏障都带有快照的ID,快照的记录已推送到快照的前面。屏障不会中断流的流动,因此非常轻便。来自不同快照的多个障碍可以同时出现在流中,这意味着各种快照可能同时发生。
_
流屏障(barriers)在流源处注入并行数据流中。快照n的屏障被注入的点(我们称其为_S _)是快照中覆盖数据的源流中的位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。该位置_S _被报告给_检查点协调器_(Flink的JobManager)。
然后,屏障向下游流动。当中间operator从其所有输入流中收到快照n的屏障时,它会将快照n的屏障发射到其所有输出流中。接收器运算符(流式DAG的末尾)从其所有输入流接收到屏障n后,便将快照n确认给检查点协调器。所有接收器都确认快照后,就认为快照已完成。
一旦快照n完成,该作业将再也不会向源询问_S _之前的记录,因为此时这些记录(及其后代记录)将通过整个数据流拓扑。
接收多个输入流的operator需要在快照barriers上_对齐_输入流。上图说明了这一点:
- operator一旦从传入流接收到快照屏障n,就无法处理该流中的任何其他记录,直到它也从其他输入接收到屏障n为止。否则,它将混合属于快照_n的_记录和属于快照_n + 1的记录_。
- 报告屏障_n的_流被暂时搁置。从这些流接收到的记录将不进行处理,而是放入输入缓冲区中。
- 一旦最后一个流接收到屏障n,操作员将发出所有未决的传出记录,然后自身发出快照n屏障。
- 此后,它将恢复处理所有输入流中的记录,处理输入缓冲中的记录,然后再处理流中的记录。
State
当运算符包含任何形式的_状态时_,该状态也必须是快照的一部分。operator状态以不同的形式出现:
- _用户定义的状态_:这是由转换功能(如
map()
或filter()
)直接创建和修改的状态。有关详细信息,请参见流应用程序中的状态。 - _系统状态_:此状态是指作为操作员计算的一部分的数据缓冲区。这种状态的一个典型示例是_窗口缓冲区_,系统在其中收集(并汇总)窗口记录,直到评估并逐出窗口为止。
操作员在从输入流接收到所有快照barriers的时间点,以及在将barriers发送到其输出流之前,对其状态进行快照。届时,将对进行障碍之前的记录进行的所有状态更新,并且不依赖于应用障碍后的记录进行的任何更新。由于快照的状态可能很大,因此将其存储在可配置state _backend中_。默认情况下,这是JobManager的内存,但对于生产用途,应配置分布式可靠存储(例如HDFS)。存储状态后,操作员确认检查点,将快照屏障发送到输出流中,然后继续。
现在生成的快照包含:
- 对于每个并行流数据源,快照启动时流中的偏移量/位置
- 对于每个运算符,指向作为快照一部分存储的状态的指针
Exactly Once vs. At Least Once
对准步骤可以向流传输程序增加等待时间。通常,这种额外的延迟大约是几毫秒,但是我们看到一些异常值的延迟显着增加的情况。对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink可以进行切换以在检查点期间跳过流对齐。一旦操作员从每个输入看到检查点障碍,仍然会绘制检查点快照。
跳过对齐后,即使到达检查点n的_某些检查点障碍,操作员仍会继续处理所有输入。这样,操作员还可以在获取检查点n的状态快照之前处理属于检查点_n + 1的_元素。在还原时,这些记录将作为重复记录出现,因为它们都包含在检查点n的状态快照中,并将在检查点n之后作为数据的一部分重播。
_注意_:对齐仅适用于具有多个前任(联接)的运算符以及具有多个发件人的运算符(在流重新分区/混洗之后)。正因为如此,有数据流只有尴尬的并行流操作(map()
,flatMap()
,filter()
,…)实际上给_正好一次_甚至在保证_至少一次_的模式。
Asynchronous State Snapshots
注意,上述机制意味着操作员在将输入状态的快照存储在_状态后端时_停止处理输入记录。每次拍摄快照时,此_同步_状态快照都会带来延迟。
可以让operator在存储其状态快照时继续进行处理,从而有效地使状态快照在后台_异步_发生。为此,operator必须能够产生状态对象,该状态对象的存储方式应确保对操作员状态的进一步修改不会影响该状态对象。例如,在RocksDB中使用_的写时复制_数据结构具有此行为。
在其输入上收到检查点屏障后,操作员将开始对其状态进行异步快照复制。它立即对输出发出障碍,并继续进行常规流处理。后台复制过程完成后,它将向检查点协调器(JobManager)确认检查点。现在,只有在所有接收器都接收到障碍并且所有有状态操作员都已确认完成备份后(可能是在障碍到达接收器之后),检查点才完成。
有关状态快照的详细信息,请参见状态后端。
Recovery
在这种机制下的恢复非常简单:失败时,Flink选择最新完成的检查点k。然后,系统重新部署整个分布式数据流,并为每个操作员提供作为检查点_k的_一部分快照的状态。设置源以开始从位置_S _读取流。例如,在Apache Kafka中,这意味着告诉使用者开始从偏移量_S _获取。
如果状态是增量快照,则操作员将从最新完整快照的状态开始,然后对该状态应用一系列增量快照更新。
有关更多信息,请参见重新启动策略。
Operator Snapshot Implementation
拍摄操作员快照时,有两个部分:同步部分和异步部分。
运算符和状态后端以Java形式提供其快照FutureTask
。该任务包含_同步_部分已完成而_异步_部分未决的状态。然后,异步部分由该检查点的后台线程执行。
纯粹检查点的操作员将同步返回已完成的操作FutureTask
。如果需要执行异步操作,则以run()
that 的方法执行FutureTask
。
这些任务是可取消的,因此可以释放流和其他消耗资源的句柄。