容错机制
容错机制
一、检查点的保存
- 周期性的触发保存 查点的保存是周期性触发的,间隔时间可以进行设置。
- 保存的时间点 当所有任务都恰好处理完一个相同的输入数据的时候,将它们的 状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。 其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状 态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之 前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source) 任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子 状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子。
- 具体的保存过程 检查点的保存,最关键的就是要等所有任务将
同一个数据
处理完毕。
二、从检查点恢复状态
- 重启应用 遇到故障之后,第一步当然就是重启。应用重新启动后,所有任务的状态会清空。
- 读取检查点,重置状态 找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态 中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻。
- 重放数据 为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向 外部数据源重新提交偏移量(offset)来实现,这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。
- 继续处理数据 接下来,我们就可以正常处理数据了,追上了发生故障时的系统状态。之后继续处理,就好像没 有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。 在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。 这里我们也可以发现,想要正确地从检查点中读取并恢复状态,必须知道每个算子任务状 态的类型和它们的先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,我们在 改动程序、修复 bug 时要保证状态的拓扑顺序和类型不变。状态的拓扑结构在 JobManager 上 可以由 JobGraph 分析得到,而检查点保存的定期触发也是由 JobManager 控制的;所以故障恢 复的过程需要 JobManager 的参与。
三、检查点算法
- 检查点分界线(Barrier)
我们现在的目标是,在不暂停流处理的前提下,让每个任务认出
触发检查点保存的那个数据。 我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构, 专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前 数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数 据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个 检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一 个检查点。 这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的 “分界线”(Checkpoint Barrier)。 与水位线很类似,检查点分界线也是一条特殊的数据,由 Source 算子注入到常规的数据 流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中 带有一个检查点 ID,这是当前要保存的检查点的唯一标识 这样,分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改, 都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会 被包含在之后的检查点中
- 分布式快照算法
具体实现上,Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照” (asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行 下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时, 需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区 的 barrier 都到齐,才可以开始状态的保存。
四、检查点配置
- 启用检查
默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
- 检查点存储(Checkpoint Storage)
检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默 认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink 也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。 具 体 可以通过调用检查点配置的 .setCheckpointStorage() 来 配 置 , 需 要 传 入 一 个 CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管
- 其他高级配置
(1)检查点模式(CheckpointingMode) (2)超时时间(checkpointTimeout) (3)最小间隔时间(minPauseBetweenCheckpoints) (4)最大并发检查点数量(maxConcurrentCheckpoints) (5)开启外部持久化存储(enableExternalizedCheckpoints) (6)检查点异常时是否让整个任务失败(failOnCheckpointingErrors) (7)不对齐检查点(enableUnalignedCheckpoints)
五、状态一致性
5.1 一致性的概念和级别
在分布式系统中,一致性(consistency)是一个非常重要的概念;在事务(transaction) 中,一致性也是重要的一个特性。Flink 中一致性的概念,主要用在故障恢复的描述中,所以 更加类似于事务中的表述。那到底什么是一致性呢?
简单来讲,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相 同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务 而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不 同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理 同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但 在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保 证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。
一般说来,状态一致性有三种级别:
最多一次(AT-MOST-ONCE)
当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态, 也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是 “最多处理一次”。
我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所 以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”, 而对近似正确的结果也能接受,那这也不失为一种很好的解决方案。
至少一次(AT-LEAST-ONCE)
在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次” (at-least-once)
,就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。
在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有“幂等性”。比如, 如果我们统计电商网站的 UV,需要对每个用户的访问数据进行去重处理,所以即使同一个数 据被处理多次,也不会影响最终的结果,这时使用 at-least-once 语义是完全没问题的。当然, 如果重复数据对结果有影响,比如统计的是 PV,或者之前的统计词频 word count,使用 at-least-once 语义就可能会导致结果的不一致了。
为了保证达到 at-least-once 的状态一致性,我们需要在发生故障时能够重放数据。最常见 的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录 一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka 就是这种架构的一个典型实现。
精确一次(EXACTLY-ONCE)
最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。 这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处 理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一 次统计。
exactly-once 可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生 过故障一样。
很明显,要做的 exactly-once,首先必须能达到 at-least-once 的要求,就是数据不丢。所以 同样需要有数据重放机制来保证这一点。另外,还需要有专门的设计保证每个数据只被处理一 次。Flink 中使用的是一种轻量级快照机制——检查点(checkpoint)来保证 exactly-once 语义。
端到端的状态一致性
我们已经知道检查点可以保证 Flink 内部状态的一致性,而且可以做到精确一次 (exactly-once)。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问 题呢?
没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。 而用户或者外部应用不会直接从 Flink 内部的状态读取数据,往往需要我们将处理结果写入外 部存储中。这就要求我们不仅要考虑 Flink 内部数据的处理转换,还涉及从外部数据源读取, 以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完 整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的 那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否 达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。在下一节, 我们就将详细讨论端到端的 exactly-once 一致性语义如何保证。
5.2 端到端精确一次(end-to-end exactly-once)
输入端保证
输入端主要指的就是 Flink 读取的外部数据源。对于一些数据源来说,并不提供数据的缓 冲或是持久化保存,数据被消费之后就彻底不存在了。例如 socket 文本流就是这样, socket 服务器是不负责存储数据的,发送一条数据之后,我们只能消费一次,是“一锤子买卖”。对 于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障 期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证 at-most-once 的一致性语 义,相当于没有保证。
想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对 数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是 Kafka。在 Flink 的 Source 任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取 出来,对数据源重置偏移量,重新获取数据。
数据源可重放数据,或者说可重置读取数据偏移量,加上 Flink 的 Source 算子将偏移量作 为状态保存进检查点,就可以保证数据不丢。这是达到 at-least-once 一致性语义的基本要求, 当然也是实现端到端 exactly-once 的基本要求。
输出端保证
有了 Flink 的检查点机制,以及可重放数据的外部数据源,我们已经能做到 at-least-once 了。但是想要实现 exactly-once 却有更大的困难:数据有可能重复写入外部系统。
因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过 Sink 任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果 出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对 Flink 内部状态来说,重复 计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说, 已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。
所以这时,我们只保证了端到端的 at-least-once 语义。
为了实现端到端 exactly-once,我们还需要对外部存储系统、以及 Sink 连接器有额外的要 求。能够保证 exactly-once 一致性的写入方式有两种:
- 幂等(idempotent)写入
所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就 是说,后面再重复执行就不会对结果起作用了。
数学中一个典型的例子是,ex 的求导下操作,无论做多少次,得到的都是自身。
而在数据处理领域,最典型的就是对 HashMap 的插入操作:如果是相同的键值对,后面 的重复插入就都没什么作用了。
- 事务(transactional)写入
如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方 式。
之前我们提到,输出端最大的问题就是“覆水难收”,写入到外部系统的数据难以撤回。 自然想到,那怎样可以收回一条已写入的数据呢?利用事务就可以做到。
我们都知道,事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完 成,否则在每个操作中所做的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、 一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的 ACID。
在 Flink 流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检 查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是: 用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当 Sink 任务 遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务 中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程 出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存 完),所以也会回滚,写入到外部的数据就被撤销了。