前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun

前言
官网
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/savepoints/
测试任务
先起一个测试任务,就用之前文章中的 cdc_mysql2mysql1
bin/sql-client.sh -f sql/cdc_mysql2mysql.sql
使用 YARN 触发 Savepoint
根据官方文档:1
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
那么命令应该为1
bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -yid application_1750755047138_0076
报错:1
2
3
4
5
6
7
8
9
10java.lang.NoSuchMethodError: org.apache.commons.cli.CommandLine.hasOption(Lorg/apache/commons/cli/Option;)Z
at org.apache.flink.client.cli.SavepointOptions.<init>(SavepointOptions.java:45)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:738)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1118)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1198)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1198)
查看有哪些jar包中有这个冲突的类1
2
3
4
5
6grep -rl "org.apache.commons.cli.CommandLine" lib/*
lib/flink-dist-1.15.3.jar
lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar
lib/flink-sql-connector-hbase-2.2-1.15.3.jar
lib/hudi-flink1.15-bundle-0.13.0.jar
最终确定冲突的包为 flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar1
mv lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar.bak
继续尝试,但是报错:
1 | org.apache.flink.util.FlinkException: No cluster id was specified. Please specify a cluster to which you would like to connect. |
这与不指定yid报的错一样1
bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql
猜测 -yid 不生效,改为使用 -Dyarn.application.id
1 | bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076 |
然后报错:1
Caused by: java.io.IOException: Failed to create savepoint directory at /savepoint/cdc_mysql2mysql
观察 savepoint 相关的日志1
2
3yarn logs -applicationId application_1750755047138_0076 | grep "Savepoint"
2025-06-27 10:45:31,471 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]
这时,虽然因为创建 savepoint 文件夹失败了,但是其实已经触发了savepoint,所以有 savepoint 相关的日志
将 savepoint 路径加上 hdfs:// 前缀
1 | bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 hdfs:///savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076 |
成功:1
2
3
4Triggering savepoint for job 24f61a106d31205a122b66e45b2984e7.
Waiting for response...
Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
You can resume your program from this savepoint with the run command.
查看 savepoint 结果1
2
3hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 1 items
drwxr-xr-x - hive hdfs 0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
观察 savepoint 相关的日志
1 | 2025-06-27 10:20:02,159 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 5 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750990802141 for job 2025-06-27 10:45:31,471 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7. |
又多了一条 savepoint 相关的日志
我们在 web ui 中也可以看到 最新的 savepoint (这里截的别的任务的图)
-yid 不生效
在 Flink 1.10 及以后的版本中,-yid 参数已被弃用,必须使用 -Dyarn.application.id 来指定 YARN Application ID。Flink 在 2020 年左右(1.10 版本)重构了命令行参数解析系统,将所有 YARN 相关参数统一为 -D 前缀的配置项,目的是:
- 简化参数体系:避免记忆 -m、-yid、-ytm 等特殊前缀。
- 统一配置方式:所有参数都可以通过 -D 传递,与 flink-conf.yaml 保持一致。
- 减少版本间兼容性问题。
使用 savepoint 取消作业 (cancel)
1 | bin/flink cancel -s hdfs:///savepoint/cdc_mysql2mysql 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076 |
1 | Cancelled job 24f61a106d31205a122b66e45b2984e7. Savepoint stored in hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. |
-s
是--withSavepoint
的简写
观察 savepoint 相关的日志
1 | 2025-06-27 10:45:31,471 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7. |
savepoint 结果:1
2
3
4hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 2 items
drwxr-xr-x - hive hdfs 0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x - hive hdfs 0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
从 web ui 中看状态为 CANCELED :
使用 savepoint 停止作业 (stop)
先试一下停止刚才取消的作业
1 | bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076 |
因为刚才的作业已经取消了,所以报错:1
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (24f61a106d31205a122b66e45b2984e7)
那么我们再启动一个新的任务1
bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 37a54dad1f7c781ffc0001555b5dcee6 -Dyarn.application.id=application_1750755047138_0077
1 | Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d |
-p
是--savepointPath
的简写
观察日志:1
2025-06-27 10:59:34,479 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1750993174460 for job 37a54dad1f7c781ffc0001555b5dcee6.
观察结果1
2
3
4
5hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 3 items
drwxr-xr-x - hive hdfs 0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x - hive hdfs 0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
drwxr-xr-x - hive hdfs 0 2025-06-27 10:59 /savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d
根据结果可知,savepoint 路径的名称中的中间部分是 jobId 的前六个字符。
从 web ui 中看状态为 FINISHED :
SET ‘table.dml-sync’ = ‘true’;
通过 sql-client 提交的任务,默认参数下,当我们 cancel 或者 stop 任务后,只有taskmanger挂掉,jobmanager还存活,所以yarn任务还是running,可以通过设置参数: SET ‘table.dml-sync’ = ‘true’; 这样 cancel 或者 stop 任务后对应的 yarn 任务也会 FINISHED 。
具体可参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sqlclient/
与 flink run 的
--detached
(简写:-d
)表现效果一样。
cancel 和 stop
默认不指定 savepoint:1
2bin/flink cancel 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092
bin/flink stop 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092
这时 cancel 是成功的:1
Cancelled job 15312da78d337525a36ca3cf40f04ff9.
但stop会报错:1
org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
说明 stop 必须指定 savepoint 路径,或者说在任务中配置了 state.savepoints.dir 会自动生成 savepoint
sql 中添加:1
set state.savepoints.dir=hdfs:///flink/savepoints;
再stop:1
Savepoint completed. Path: hdfs://cluster1/flink/savepoints/savepoint-9324cd-af49c550593d
特性 | flink cancel | flink stop |
---|---|---|
终止方式 | 强制终止:立即中断作业执行 | 优雅停止:等待当前处理中的数据完成 |
savepoint 生成 | 默认不生成,需通过 -s 或 --withSavepoint 参数指定 |
默认生成, 可以通过 -p 或 --savepointPath 参数显式指定 |
适用场景 | 作业出现故障需紧急终止;无需保留作业状态 | 需要平滑下线作业;希望后续恢复作业(需配合 savepoint) |
作业状态 | 最终状态为 CANCELED | 最终状态为 FINISHED(如果所有算子成功关闭) |
可以结合官网了解更多:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/cli/
从作业恢复
SQL
1 | SET execution.savepoint.path = hdfs:///savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a; |
命令
1 | -s,--fromSavepoint <savepointPath> |
示例1
2
3
4
5bin/flink run -m yarn-cluster \
-d \
--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \
-c com.dkl.flink.Test \
/opt/dkl/flink-demo-1.0.jar
修改任务算子
如果任务有改动,比如添加或删除了 set pipeline.operator-chaining=false; 那么在恢复任务时会抛出异常:1
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
可以通过 --allowNonRestoredState
(简写:-n
) 选项跳过无法映射到新程序的状态。1
-n,--allowNonRestoredState
示例:1
2
3
4
5
6bin/flink run -m yarn-cluster \
-d \
--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \
-c com.dkl.flink.Test \
--allowNonRestoredState \
/opt/dkl/flink-demo-1.0.jar
没有设置 checkpoint 的任务
无论有没有设置checkpoint,都支持savepoint,通过下面的日志可以发现,当没有设置 checkpoint 时,savepoint对应的 checkpoint 数字值是连续的。
1 | yarn logs -applicationId application_1750755047138_0093 | grep "Savepoint" |
迁移支持
特性 | Checkpoint | Savepoint |
---|---|---|
迁移支持 | 有限支持,需严格条件 | 设计支持跨环境迁移 |
格式稳定性 | 版本敏感,易损坏 | 版本无关,向后兼容 |
拓扑变更支持 | 通常不支持 | 支持算子增删和并行度调整 |
手动干预复杂度 | 高,需修改配置和代码 | 低,CLI 命令直接支持 |