前言
学习总结Flink SQL Checkpoint的使用,主要目的是为了验证Flink SQL流式任务挂掉后,重启时还可以继续从上次的运行状态恢复。
验证方式
Flink SQL流式增量读取Hudi表然后sink MySQL表,任务启动后处于running状态,先查看sink表有数据,然后将对应的yarn kill掉,再通过设置的checkpoint重启任务,任务重启后验证sink表的数据量。Flink SQL流式增量读取Hudi表可以参考:Flink SQL增量查询Hudi表
版本
- Flink 1.14.3
- Hudi 0.13.0
Checkpoint 参数
一般需要设置的常用参数1
2
3
4
5
6
7
8
9-- checkpoint间隔时间,单位毫秒,没有默认值,如果想开启checkpoint,需要将该参数设置一个大于0的数值
-- 如果想提升sink性能,比如写hudi,需要将该值设置大一点,因为间隔时间决定了批次大小
-- checkpoint间隔时间不能设置太短也不能设置太长,太短影响写入性能,太长影响数据及时性。
set execution.checkpointing.interval=1000;
-- 保存checkpoint文件的目录
set state.checkpoints.dir=hdfs:///flink/checkpoints/hudi2mysql;
-- 任务取消后保留checkpoint,默认值NO_EXTERNALIZED_CHECKPOINTS,
-- 可选值NO_EXTERNALIZED_CHECKPOINTS、DELETE_ON_CANCELLATION、RETAIN_ON_CANCELLATION
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
从checkpoint恢复
1 | set execution.savepoint.path=hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314; |
其他参数
1 | -- checkpoint模式,默认值EXACTLY_ONCE,可选值:EXACTLY_ONCE、AT_LEAST_ONCE |