状态编程
状态编程
在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数 据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输 出结果的所有数据,就叫作这个任务的状态。
状态的分类
托管状态(Managed State)和原始状态(Raw State)
托管状态就是 由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们 只要调接口就可以; 而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管 理,实现状态的序列化和故障恢复。
具体来讲,托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状 态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重 组分配到所有的子任务实例上。对于具体的状态内容,Flink 也提供了值状态(ValueState)、 列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部 支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数 类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。
而对比之下,原始状态就全部需要自定义了。Flink 不会对状态进行任何自动操作,也不 知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大 量的精力来处理状态的管理和维护。 所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况 下不推荐使用。绝大多数应用场景,我们都可以用 Flink 提供的算子或者自定义托管状态来实 现需求。
算子状态(Operator State)和按键分区状态(Keyed State)
一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink 能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区 之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。 在这种情况下,状态的访问方式又会有所不同。
基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。
- 算子状态(Operator State) 状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意 味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态, 状态对于同一任务而言是共享的,算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本 地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接 口。
- 按键分区状态(Keyed State) 状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流 (KeyedStream)中,也就 keyBy 之后才可以使用,按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因 为聚合的结果是以Keyed State的形式保存的。另外,也可以通过富函数类(Rich Function) 来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。
按键分区状态(Keyed State)
值状态(ValueState)
顾名思义,状态中只保存一个“值”(value)。
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.print("input");
// 统计每个用户的pv,隔一段时间(10s)输出一次结果
stream.keyBy(data -> data.user)
.process(new PeriodicPvResult())
.print();
public static class PeriodicPvResult extends KeyedProcessFunction<String ,Event, String>{
// 定义两个状态,保存当前pv值,以及定时器时间戳
ValueState<Long> countState;
ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
// 更新count值
Long count = countState.value();
if (count == null){
countState.update(1L);
} else {
countState.update(count + 1);
}
// 注册定时器
if (timerTsState.value() == null){
ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
timerTsState.update(value.timestamp + 10 * 1000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " pv: " + countState.value());
// 清空状态
timerTsState.clear();
}
}
列表状态(ListState)
将需要保存的数据,以列表(List)的形式组织起来。在 ListState<T>
接口中同样有一个 类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式 与一般的 List 非常相似。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env
.fromElements(
Tuple3.of("a", "stream-1", 1000L),
Tuple3.of("b", "stream-1", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
return t.f2;
}
})
);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env
.fromElements(
Tuple3.of("a", "stream-2", 3000L),
Tuple3.of("b", "stream-2", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
return t.f2;
}
})
);
stream1.keyBy(r -> r.f0)
.connect(stream2.keyBy(r -> r.f0))
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
private ListState<Tuple3<String, String, Long>> stream1ListState;
private ListState<Tuple3<String, String, Long>> stream2ListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stream1ListState = getRuntimeContext().getListState(
new ListStateDescriptor<Tuple3<String, String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING))
);
stream2ListState = getRuntimeContext().getListState(
new ListStateDescriptor<Tuple3<String, String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
stream1ListState.add(left);
for (Tuple3<String, String, Long> right : stream2ListState.get()) {
collector.collect(left + " => " + right);
}
}
@Override
public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
stream2ListState.add(right);
for (Tuple3<String, String, Long> left : stream1ListState.get()) {
collector.collect(left + " => " + right);
}
}
})
.print();
env.execute();
映射状态(MapState)
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的 列表。对应的 MapState<UK, UV>
接口中,就会有 UK、UV 两个泛型,分别表示保存的 key 和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 统计每10s窗口内,每个url的pv
stream.keyBy(data -> data.url)
.process(new FakeWindowResult(10000L))
.print();
env.execute();
}
public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
// 定义属性,窗口长度
private Long windowSize;
public FakeWindowResult(Long windowSize) {
this.windowSize = windowSize;
}
// 声明状态,用map保存pv值(窗口start,count)
MapState<Long, Long> windowPvMapState;
@Override
public void open(Configuration parameters) throws Exception {
windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
// 每来一条数据,就根据时间戳判断属于哪个窗口
Long windowStart = value.timestamp / windowSize * windowSize;
Long windowEnd = windowStart + windowSize;
// 注册 end -1 的定时器,窗口触发计算
ctx.timerService().registerEventTimeTimer(windowEnd - 1);
// 更新状态中的pv值
if (windowPvMapState.contains(windowStart)){
Long pv = windowPvMapState.get(windowStart);
windowPvMapState.put(windowStart, pv + 1);
} else {
windowPvMapState.put(windowStart, 1L);
}
}
// 定时器触发,直接输出统计的pv结果
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Long windowEnd = timestamp + 1;
Long windowStart = windowEnd - windowSize;
Long pv = windowPvMapState.get(windowStart);
out.collect( "url: " + ctx.getCurrentKey()
+ " 访问量: " + pv
+ " 窗口:" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd));
// 模拟窗口的销毁,清除map中的key
windowPvMapState.remove(windowStart);
}
}
归约状态(ReducingState)
类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值 作为状态保存下来。ReducintState<T>
这个接口调用的方法类似于 ListState,只不过它保存的 只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和 之前的状态进行归约,并用得到的结果更新状态。
聚合状态(AggregatingState)
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个 累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不 同,使用更加灵活。 同样地,AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素 时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
// 统计每个用户的点击频次,到达5次就输出统计结果
stream.keyBy(data -> data.user)
.flatMap(new AvgTsResult())
.print();
env.execute();
}
public static class AvgTsResult extends RichFlatMapFunction<Event, String>{
// 定义聚合状态,用来计算平均时间戳
AggregatingState<Event, Long> avgTsAggState;
// 定义一个值状态,用来保存当前用户访问频次
ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
"avg-ts",
new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1);
}
@Override
public Long getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return null;
}
},
Types.TUPLE(Types.LONG, Types.LONG)
));
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
}
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
Long count = countState.value();
if (count == null){
count = 1L;
} else {
count ++;
}
countState.update(count);
avgTsAggState.add(value);
// 达到5次就输出结果,并清空状态
if (count == 5){
out.collect(value.user + " 平均时间戳:" + new Timestamp(avgTsAggState.get()));
countState.clear();
}
}
}
状态生存时间
配置状态的TTL,如下。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.keyBy(data -> data.user)
.flatMap(new MyFlatMap())
.print();
env.execute();
}
// 实现自定义的FlatMapFunction,用于Keyed State测试
public static class MyFlatMap extends RichFlatMapFunction<Event, String>{
// 定义状态
ValueState<Event> myValueState;
ListState<Event> myListState;
MapState<String, Long> myMapState;
ReducingState<Event> myReducingState;
AggregatingState<Event, String> myAggregatingState;
// 增加一个本地变量进行对比
Long count = 0L;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Event> valueStateDescriptor = new ValueStateDescriptor<>("my-state", Event.class);
myValueState = getRuntimeContext().getState(valueStateDescriptor);
myListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("my-list", Event.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("my-map", String.class, Long.class));
myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Event>("my-reduce",
new ReduceFunction<Event>() {
@Override
public Event reduce(Event value1, Event value2) throws Exception {
return new Event(value1.user, value1.url, value2.timestamp);
}
}
, Event.class));
myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Long, String>("my-agg",
new AggregateFunction<Event, Long, String>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accmulator) {
return accmulator + 1;
}
@Override
public String getResult(Long accumulator) {
return "count: " + accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
, Long.class));
// 配置状态的TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
// 访问和更新状态
System.out.println(myValueState.value());
myValueState.update(value);
System.out.println( "my value: " + myValueState.value() );
myListState.add(value);
myMapState.put(value.user, myMapState.get(value.user) == null? 1: myMapState.get(value.user) + 1);
System.out.println( "my map value: " + myMapState.get(value.user) );
myReducingState.add(value);
System.out.println( "my reducing value: " + myReducingState.get() );
myAggregatingState.add(value);
System.out.println( "my agg value: " + myAggregatingState.get() );
count ++;
System.out.println("count: " + count);
}
}
算子状态
除按键分区状态(Keyed State)之外,另一大类受控状态就是算子状态(Operator State)。 从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需 要考虑不同 key 的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法 也会有一些区别。
算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接 的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。 在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题 (topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次” (exactly-once)状态一致性时非常有用。关于状态一致性的内容,我们会在第十章详细展开。 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根 据状态的类型不同,重组分配的方案也会不同。
列表状态(ListState)
与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处 理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所 有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。 当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当 于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均 匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似, 是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它 直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态 (ValueState)。
联合列表状态(UnionListState)
与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别 在于,算子并行度进行缩放调整时对于状态的分配方式不同。
UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分 配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并 行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态 项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源 和效率考虑一般不建议使用联合重组的方式。
广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。 这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种 特殊的算子状态,就叫作广播状态(BroadcastState)。
因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单, 只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行 子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于 一个“广播流”(BroadcastStream)来创建。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000L);
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(500);
checkpointConfig.setCheckpointTimeout(60000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.enableUnalignedCheckpoints();
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.print("input");
// 批量缓存输出
stream.addSink(new BufferingSink(10));
env.execute();
}
public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
private final int threshold;
private transient ListState<Event> checkpointedState;
private List<Event> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Event value, Context context) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Event element: bufferedElements) {
// 输出到外部系统,这里用控制台打印模拟
System.out.println(element);
}
System.out.println("==========输出完毕=========");
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// 把当前局部变量中的所有元素写入到检查点中
for (Event element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
"buffered-elements",
Types.POJO(Event.class));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 如果是从故障中恢复,就将ListState中的所有元素添加到局部变量中
if (context.isRestored()) {
for (Event element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
广播状态
算子状态中有一类很特殊,就是广播状态(Broadcast State)。从概念和原理上讲,广播状 态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接 复制就可以了。然而在应用上,广播状态却与其他算子状态大不相同。本节我们就专门来讨论 一下广播状态的使用。
由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务 需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就 是一个典型的广播状态。广播状态与其他算子状态的列表(list)结构不同,底层 是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取用户行为事件流
DataStreamSource<Action> actionStream = env.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "buy")
);
// 定义行为模式流,代表了要检测的标准
DataStreamSource<Pattern> patternStream = env
.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "buy")
);
// 定义广播状态的描述器,创建广播流
MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
"patterns", Types.VOID, Types.POJO(Pattern.class));
BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);
// 将事件流和广播流连接起来,进行处理
DataStream<Tuple2<String, Pattern>> matches = actionStream
.keyBy(data -> data.userId)
.connect(bcPatterns)
.process(new PatternEvaluator());
matches.print();
env.execute();
}
public static class PatternEvaluator
extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {
// 定义一个值状态,保存上一次用户行为
ValueState<String> prevActionState;
@Override
public void open(Configuration conf) {
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));
}
@Override
public void processBroadcastElement(
Pattern pattern,
Context ctx,
Collector<Tuple2<String, Pattern>> out) throws Exception {
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
// 将广播状态更新为当前的pattern
bcState.put(null, pattern);
}
@Override
public void processElement(Action action, ReadOnlyContext ctx,
Collector<Tuple2<String, Pattern>> out) throws Exception {
Pattern pattern = ctx.getBroadcastState(
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// 如果前后两次行为都符合模式定义,输出一组匹配
if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// 更新状态
prevActionState.update(action.action);
}
}
// 定义用户行为事件POJO类
public static class Action {
public String userId;
public String action;
public Action() {
}
public Action(String userId, String action) {
this.userId = userId;
this.action = action;
}
@Override
public String toString() {
return "Action{" +
"userId=" + userId +
", action='" + action + '\'' +
'}';
}
}
// 定义行为模式POJO类,包含先后发生的两个行为
public static class Pattern {
public String action1;
public String action2;
public Pattern() {
}
public Pattern(String action1, String action2) {
this.action1 = action1;
this.action2 = action2;
}
@Override
public String toString() {
return "Pattern{" +
"action1='" + action1 + '\'' +
", action2='" + action2 + '\'' +
'}';
}
}
状态持久化
状态后端(State Backends)
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快 照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个 流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果 发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程, 就如同“读档”一样。
如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数 据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读 取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子 就是 Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。 这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once) 的一致性,还需要数据写入外部系统时的相关保证。
状态后端(State Backends)
检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进 行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令; TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中; 完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有 TaskManager 的返回信息后,就会确认当前检查点成功保存。而这一切工作的 协调,就需要一个“专职人员”来完成。
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就 叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查 点(checkpoint)写入远程的持久化存储。