public static void main(String[] args) throws Exception {
// 获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 读取数据源
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
new Event("Alice", "./home", 1000L),
new Event("Bob", "./cart", 1000L),
new Event("Alice", "./prod?id=1", 5 * 1000L),
new Event("Cary", "./home", 60 * 1000L),
new Event("Bob", "./prod?id=3", 90 * 1000L),
new Event("Alice", "./prod?id=7", 105 * 1000L)
);
// 2. 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 3. 将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);
// 4. 用执行SQL 的方式提取数据
Table resultTable1 = tableEnv.sqlQuery("select url, user from " + eventTable);
// 5. 基于Table直接转换
Table resultTable2 = eventTable.select($("user"), $("url"))
.where($("user").isEqual("Alice"));
// 6. 将表转换成数据流,打印输出
tableEnv.toDataStream(resultTable1).print("result1");
tableEnv.toDataStream(resultTable2).print("result2");
// 执行程序
env.execute();
}
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000L);
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(500);
checkpointConfig.setCheckpointTimeout(60000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.enableUnalignedCheckpoints();
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.print("input");
// 批量缓存输出
stream.addSink(new BufferingSink(10));
env.execute();
}
public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
private final int threshold;
private transient ListState<Event> checkpointedState;
private List<Event> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Event value, Context context) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Event element: bufferedElements) {
// 输出到外部系统,这里用控制台打印模拟
System.out.println(element);
}
System.out.println("==========输出完毕=========");
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// 把当前局部变量中的所有元素写入到检查点中
for (Event element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
"buffered-elements",
Types.POJO(Event.class));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 如果是从故障中恢复,就将ListState中的所有元素添加到局部变量中
if (context.isRestored()) {
for (Event element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'input/clicks.csv', " +
" 'format' = 'csv' " +
")";
tableEnv.executeSql(createDDL);
// 2. 在流转换成Table时定义时间属性
SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
$("et").rowtime());
// clickTable.printSchema();
// 聚合查询转换
// 1. 分组聚合
Table aggTable = tableEnv.sqlQuery("SELECT user_name, COUNT(1) FROM clickTable GROUP BY user_name");
// 2. 分组窗口聚合
Table groupWindowResultTable = tableEnv.sqlQuery("SELECT " +
"user_name, " +
"COUNT(1) AS cnt, " +
"TUMBLE_END(et, INTERVAL '10' SECOND) as endT " +
"FROM clickTable " +
"GROUP BY " + // 使用窗口和用户名进行分组
" user_name, " +
" TUMBLE(et, INTERVAL '10' SECOND)" // 定义1小时滚动窗口
);
// 3. 窗口聚合
// 3.1 滚动窗口
Table tumbleWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" TUMBLE( TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
") " +
"GROUP BY user_name, window_start, window_end "
);
// 3.2 滑动窗口
Table hopWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" HOP( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
") " +
"GROUP BY user_name, window_start, window_end "
);
// 3.3 累积窗口
Table cumulateWindowResultTable = tableEnv.sqlQuery("SELECT user_name, COUNT(url) AS cnt, " +
" window_end AS endT " +
"FROM TABLE( " +
" CUMULATE( TABLE clickTable, DESCRIPTOR(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)" +
") " +
"GROUP BY user_name, window_start, window_end "
);
// 4. 开窗聚合
Table overWindowResultTable = tableEnv.sqlQuery("SELECT user_name, " +
" avg(ts) OVER (" +
" PARTITION BY user_name " +
" ORDER BY et " +
" ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
") AS avg_ts " +
"FROM clickTable");