CheckPoint
CheckPoint
程序中配置CheckPoint
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp1:8020/flink/flink-cdc/ck");
// 单位毫秒 (头与头之间的间隔时间)
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 同时允许存在ck的个数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 两次ck的间隔时间(上一次结束到这一次开始)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
// 碰到异常挂掉,自动重启。
// env.setRestartStrategy(RestartStrategies.failureRateRestart());
从savepoint中恢复
先保存savepoint
bin/flink savepoint task_id hdfs://hdp1:8020/flink/sv
从savepoint恢复
bin/flink run -m hdp1:8081 -c com.bihell.flink ./dice-flink.jar -s hdfs://hdp1:8020/flink/flink-cdc/sv/savepoint-xxxxxx