DataStream API 转换算子(Transform)
DataStream API 转换算子(Transform)
Map
// 传入匿名类,实现MapFunction
stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event e) throws Exception {
return e.user;
}
});
// 传入Lambda表达式
SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
Filter
// 传入匿名类实现FilterFunction
stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event e) throws Exception {
return e.user.equals("Mary");
}
});
// 传入FilterFunction实现类
stream.filter(new UserFilter()).print();
// 传入Lambda表达式
stream.filter(data->data.user.equals("Alice"));
public static class UserFilter implements FilterFunction<Event> {
@Override
public boolean filter(Event e) throws Exception {
return e.user.equals("Mary");
}
}
FlatMap
stream.flatMap(new MyFlatMap()).print();
stream.flatMap((Event value, Collector<String> out) -> {
if (value.user.equals("Mary")) {
out.collect(value.url);
} else if (value.user.equals("Bob")) {
out.collect(value.user);
out.collect(value.url);
}
}).returns(new TypeHint<String>() {});
public static class MyFlatMap implements FlatMapFunction<Event, String> {
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
if (value.user.equals("Mary")) {
out.collect(value.user);
} else if (value.user.equals("Bob")) {
out.collect(value.user);
out.collect(value.url);
}
}
}
KeyBy
KeyedStream<Tuple2<String, Integer>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
简单聚合
// 安键分组之后进行聚合,提取当前用户最近一次访问数据
stream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event value) throws Exception {
return value.user;
}
}).max("timestamp");
// 安键分组之后进行聚合,提取当前用户最近一次访问数据
stream.keyBy(data -> data.user).maxBy("timestamp");
max 其他字段用的是第一条数据,maxby是当前最大值对应其他字段值。
Reduce
对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
计算最活跃用户:
// 将Event数据类型转换成元组类型
.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event e) throws Exception {
return Tuple2.of(e.user, 1L);
}
})
.keyBy(r -> r.f0) // 使用用户名来进行分流
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 每到一条数据,用户pv的统计值加1
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
.keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
})
富函数类
// 将点击事件转换成长整型的时间戳输出
clicks.map(new RichMapFunction<Event, Long>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("索引为 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务开始");
}
@Override
public Long map(Event value) throws Exception {
return value.timestamp;
}
@Override
public void close() throws Exception {
super.close();
System.out.println("索引为 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务结束");
}
})
物理分区
// 1. 随机分区
stream.shuffle().print("shuffle").setParallelism(4);
// 2. 轮询分区
stream.rebalance().print("rebalance").setParallelism(4);
// 3. rescale重缩放分区
// 分成了几组,在自己的小组内轮询发送,可以认为是在某些场景下对rebalance的优化
env.addSource(new RichParallelSourceFunction<Integer>() { // 这里使用了并行数据源的富函数版本
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
for (int i = 1; i <= 8; i++) {
// 将奇数发送到索引为1的并行子任务
// 将偶数发送到索引为0的并行子任务
if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
sourceContext.collect(i);
}
}
}
@Override
public void cancel() {
}
})
.setParallelism(2)
.rescale()
.print().setParallelism(4);
// 4. 广播,把当前的数据分发到下游所有的并行子任务
stream.broadcast().print("broadcast").setParallelism(4);
// 5. 全局分区,所有数据分配到一个分区
stream.global().print("global").setParallelism(4);
// 6. 自定义重分区
// 将自然数按照奇偶分区
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % 2;
}
}, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.print().setParallelism(2);