时间
时间
一、时间语意
处理时间(Processing Time)
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。
事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。 数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。 在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数 据本身。打个比方,这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就 问一下“现在几点了”;而数据本身也没有表,只有一个自带的“出厂时间”,于是任务就基于这 个时间来确定自己的时钟。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也 会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代 表时间的推进。 当然我们会发现,这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证 数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的 数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要 用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)
。
二、水位线
在 Flink 中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
如上图所示每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。 这里没有指定单位,可以理解为秒或者毫秒(方便起见,下面讲述统一认为是秒)。当产生于 2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线, 随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间 戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。
2.1 有序流中的水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处 理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间 戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
2.2 乱序流中的水位线
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产 生时间而言的。一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但 是经过数据缓存和传输之后,处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。
这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?
解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则 就不再生成新的水位线,下图所示。也就是说,只有数据的时间戳比当前时钟大,才能推 动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需 要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新 的水位线,如下图所示
这样做尽管可以定义出一个事件时钟,却也会带来一个非常大的问题:我们无法正确处理 “迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了 9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口 就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有 时间戳为7秒、8秒的数据在9秒的数据之后才到来,这就是“迟到数据”(late data)。它们 本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确。
回到上面的例子,为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是 用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳,如图下图所示。这样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到 11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗 口就可以正确计算结果了。
如果仔细观察就会看到,这种“等 2 秒”的策略其实并不能处理所有的乱序数据。比如 22 秒的数据到来之后,插入的水位线时间戳为 20,也就是当前时钟已经推进到了 20 秒;对于 10~20 秒的窗口,这时就该关闭了。但是之后又会有 17 秒的迟到数据到来,它本来应该属于 10~20 秒窗口,现在却被遗漏丢弃了。那又该怎么办呢? 既然现在等 2 秒还是等不到 17 秒产生的迟到数据,那自然我们可以试着多等几秒,也就 是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确 的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之 后不可能再有迟到数据来了。
三、水位线生成
WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的 需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了 我们的痛苦,提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程, 而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性 生成水位线的,分别对应着处理有序流和乱序流的场景。
有序流
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}))
乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的 结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为5s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
)
自定义
env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.print();
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomPeriodicGenerator();
}
}
public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; // 延迟时间
private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认200ms调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
水位线的传递
如上图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三 个并行子任务,所以会向三个分区发出水位线。具体过程如下:
- 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最 小的那个。
- 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当 前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区 水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任 务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给 下游所有子任务。
- 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区 时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
- 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间 戳为 4 的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题, 每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果 总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。
将迟到的数据放入侧输出流
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧 输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些错过了该上的车、本该被丢弃的数据。