前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
主要总结 Flink 重启策略
官方文档
- https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/task_failure_recovery/
- https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/config/#fault-tolerance
版本
- Flink 1.15.4
Task 故障恢复
Task Failure Recovery
当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。
重启策略
Restart Strategies
Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。 如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
通过 Flink 的配置文件 flink-conf.yaml 来设置默认的重启策略。配置参数 restart-strategy 定义了采取何种策略。
- key : restart-strategy 从1.17版本开始之后改为 restart-strategy.type
- value: 没有默认值,默认情况下:如果没有启用 checkpoint,就采用“不重启”策略。如果启用了 checkpoint 且没有配置重启策略,那么就采用固定延时重启策略, 此时最大尝试重启次数由 Integer.MAX_VALUE 参数设置。1.19版本,默认策略改为指数延迟重启策略。 可选:
- none, off, disable: 不重启策略。
- fixeddelay, fixed-delay :固定延时重启策略。
- failurerate, failure-rate: 故障率重启策略。
- exponentialdelay, exponential-delay: 指数延迟重启策略。
需要注意的是,如果配置了重启策略,无论是否启用 checkpoint 都会按照配置的策略来。所以在 flink-conf.yaml 配置全局的重启策略会影响没有启用 checkpoint 的任务,这可能并不是我们想要的,所以最好是在单个任务中单独配置重启策略。
另外默认情况下,启用 checkpoint 的任务会采用固定延时重启策略,且重启次数为 Integer.MAX_VALUE ,也就是无限重启,这也并不是我们想要的效果。所以最好的配置方法是:对于没有开启 checkpoint 的任务,不配置重启策略, 开启 checkpoint 的任务,配置重启策略,并配置默认的重启次数。
为作业单独设置
下例展示了如何给我们的作业设置固定延时重启策略。 如果发生故障,系统会重启作业 3 次,每两次连续的重启尝试之间等待 10 秒钟。
程序:1
2
3
4
5Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); // 尝试重启的次数
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10)); // 延时
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
SQL:
1 |
|
固定延时重启策略
Fixed Delay Restart Strategy
固定延时重启策略按照给定的次数尝试重启作业。如果尝试超过了给定的最大次数,作业将最终失败。在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
通过在 flink-conf.yaml
中设置如下配置参数,默认启用此策略。
1 | restart-strategy: fixed-delay |
- restart-strategy.fixed-delay.attempts 默认值 1 ,也就是默认重启1一次。请注意这个和没有配置重启策略的默认值不一样,没有配置策略,默认值为 Integer.MAX_VALUE。
- restart-strategy.fixed-delay.delay 默认值 1 s ,两次连续重新启动尝试之间的延迟时间。如: 1 min 、 20 s
例如:
1 | restart-strategy: fixed-delay |
程序中设置:1
2
3
4
5StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
不重启策略
No Restart Strategy
作业直接失败,不尝试重启。1
restart-strategy: none
程序中设置:1
2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
故障率重启策略
Failure Rate Restart Strategy
故障率重启策略在故障发生之后重启作业,但是当故障率(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
1 | restart-strategy: failure-rate |
- restart-strategy.failure-rate.delay 默认值 1s ,如果重新启动策略已设置为失败率,则两次连续重新启动尝试之间的延迟。可以使用符号指定:”1 min”, “20 s”
- restart-strategy.failure-rate.failure-rate-interval 默认值 1 min ,如果重新启动策略已设置为故障率,则测量故障率的时间间隔。可以使用符号指定:”1 min”, “20 s”
- restart-strategy.failure-rate.max-failures-per-interval 默认值 1 ,如果重新启动策略已设置为失败率,则作业失败前给定时间间隔内的最大重新启动次数。
例如:
1 | restart-strategy: failure-rate |
程序中设置:1
2
3
4
5
6StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
));
指数延迟重启策略
Exponential Delay Restart Strategy
指数延迟重启策略在两次连续的重新启动尝试之间,重新启动的延迟时间不断呈指数增长,直到达到最大延迟时间。 然后,延迟时间将保持在最大延迟时间。
当作业正确地执行后,指数延迟时间会在一些时间后被重置为初始值,这些阈值可以被配置。
1 | restart-strategy: exponential-delay |
- restart-strategy.exponential-delay.attempts-before-reset-backoff 默认值 infinite ,如果重新启动策略已设置为指数延迟,则Flink在作业失败之前重试执行的次数。一旦 backoff 被重置为其初始值,该数字将被重置。
- restart-strategy.exponential-delay.backoff-multiplier 默认值 1.5 ,每次失败后,backoff 值乘以该值,直到达到最大回退。
- restart-strategy.exponential-delay.initial-backoff 默认值 1 s ,如果重新启动策略已设置为指数延迟,则重新启动之间的启动持续时间。可以使用符号指定:”1 min”, “20 s”
- restart-strategy.exponential-delay.jitter-factor 默认值 0.1 ,抖动指定为 backoff 的一部分。它表示将向 backoff 添加或减去多大的随机值。当您希望避免同时重新启动多个作业时非常有用。
- restart-strategy.exponential-delay.max-backof 默认值 1 min,重新启动之间的最大持续时间。可以使用符号指定:”1 min”, “20 s”
- restart-strategy.exponential-delay.reset-backoff-threshold 默认值 1h , backoff 重置为其初始值时的阈值。它指定作业必须运行多长时间才能将指数增加的 backoff 重置为其初始值。可以使用符号指定:”1 min”, “20 s”
例如:
1 | restart-strategy: exponential-delay |
程序中设置:1
2
3
4
5
6
7
8StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1),
Time.milliseconds(1000),
1.1, // exponential multiplier
Time.milliseconds(2000), // 重置延迟时间到初始值的阈值
0.1 // jitter
));
示例
以下是一个示例,用于解释指数延迟重启策略的工作原理。
1 | restart-strategy.exponential-delay.initial-backoff: 1 s |
- initial-backoff = 1s 表示当作业第一次发生异常时会延迟 1 秒后进行重试。
- backoff-multiplier = 2 表示当作业连续异常时,每次的延迟时间翻倍。
- max-backoff = 10 s 表示重试的延迟时间最多为 10 秒。
基于这些参数:
- 当作业发生异常需要进行第 1 次重试时,作业会延迟 1 秒后重试。
- 当作业发生异常需要进行第 2 次重试时,作业会延迟 2 秒后重试(翻倍)。
- 当作业发生异常需要进行第 3 次重试时,作业会延迟 4 秒后重试(翻倍)。
- 当作业发生异常需要进行第 4 次重试时,作业会延迟 8 秒后重试(翻倍)。
- 当作业发生异常需要进行第 5 次重试时,作业会延迟 10 秒后重试(翻倍后超过上限,所以使用上限 10 秒做为延迟时间)。
- 在第 5 次重试时,延迟时间已经达到了 max-backoff(上限),所以第 5 次重试以后,作业延迟时间会保持在 10 秒不变,每次失败后都会延迟 10 秒后重试。
1 | restart-strategy.exponential-delay.jitter-factor: 0.1 |
- jitter-factor = 0.1 表示每次的延迟时间会加减一个随机值,随机值的范围在 0.1 的比例内。
- 例如第 3 次重试时,作业延迟时间在 3.6 秒到 4.4 秒之间( 3.6 = 4 0.9, 4.4 = 4 1.1)。
- 例如第 4 次重试时,作业延迟时间在 7.2 秒到 8.8 秒之间 (7.2 = 8 0.9, 8.8 = 8 1.1)。
- 随机值可以避免多个作业在同一时间重启,所以在生产环境不建议将 jitter-factor 设置为 0。
- attempts-before-reset-backoff = 8 表示如果作业连续重试了 8 次后仍然有异常,则会失败(不再重试)。
- reset-backoff-threshold = 6 min 表示当作业已经持续 6 分钟没发生异常时,则会重置延迟时间和重试计数。 也就是当作业发生异常时,如果上一次异常发生在 6 分钟之前,则重试的延迟时间重置为 1 秒,当前的重试计数重置为 1。
Fallback Restart Strategy
官方有提到 Fallback Restart Strategy
,这不是一个具体的策略。他是默认配置时(没有配置策略),对应到源码中的 FallbackRestartStrategyConfiguration
。也就是Fallback Restart Strategy
对应默认重启策略,具体看下面的默认重启策略。
默认重启策略
默认情况下,没有开启 checkpoint ,不重启。 开启 checkpoint ,默认采用固定延时重启策略。但是默认 1s 重启一次,且会一直尝试重启,没有上限次数。这种频繁无限的默认重启策略是不合理的,可能会导致外部组件雪崩,也会一直占用计算资源。
所以在 1.19版本,默认策略改为指数延迟重启策略。我们强烈推荐 Flink 用户使用指数延迟重启策略,因为使用这个策略时, 作业偶尔异常可以快速重试,作业频繁异常可以避免外部组件发生雪崩。原因如下所示:
- 所有的重启策略在重启作业时都会延迟一定的时间来避免频繁重试对外部组件的产生较大压力。
- 除了指数延迟重启策略以外的所有重启策略延迟时间都是固定的。
- 如果延迟时间设置的过短,当作业短时间内频繁异常时,会频繁重启访问外部组件的主节点,可能导致外部组件发生雪崩。 例如:大量的 Flink 作业都在消费 Kafka,当 Kafka 集群出现故障时大量的 Flink 作业都在同一时间频繁重试,很可能导致雪崩。
- 如果延迟时间设置的过长,当作业偶尔失败时需要等待很久才会重试,从而导致作业可用率降低。
- 指数延迟重启策略每次重试的延迟时间会指数递增,直到达到最大延迟时间。
- 延迟时间的初始值较短,所以当作业偶尔失败时,可以快速重试,提升作业可用率。
- 当作业短时间内频繁失败时,指数延迟重启策略会降低重试的频率,从而避免外部组件雪崩。
- 除此以外,指数延迟重启策略的延迟时间支持抖动因子 (jitter-factor) 的配置项。
- 抖动因子会为每次的延迟时间加减一个随机值。
- 即使多个作业使用指数延迟重启策略且所有的配置参数完全相同,抖动因子也会让这些作业分散在不同的时间重启。
故障恢复策略
Failover Strategies
Flink 支持多种不同的故障恢复策略,该策略需要通过 Flink 配置文件 flink-conf.yaml 中的 jobmanager.execution.failover-strategy 配置项进行配置。
故障恢复策略 | jobmanager.execution.failover-strategy 配置值 |
---|---|
全图重启 | full |
基于 Region 的局部重启(默认值) | region |
全图重启策略
Restart All Failover Strategy
在全图重启故障恢复策略下,Task 发生故障时会重启作业中的所有 Task 进行故障恢复。
基于 Region 的局部重启策略
Restart Pipelined Region Failover Strategy
该策略会将作业中的所有 Task 划分为数个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。 相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复需要重启的 Task 会更少。
此处 Region 指以 Pipelined 形式进行数据交换的 Task 集合。
- DataStream/Table/SQL 作业中的数据交换形式会根据 ExecutionConfig 中配置的 ExecutionMode 决定。
- 处于 STREAM 模式时,所有数据交换都是 Pipelined 形式。
- 处于 BATCH 模式时,所有数据交换默认都是 Batch 形式。
需要重启的 Region 的判断逻辑如下:
- 出错 Task 所在 Region 需要重启。
- 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),产出该部分数据的 Region 也需要重启。
- 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者分发会导致同一个 Result Partition 每次产生时包含的数据都不相同。
验证
验证固定延时重启策略和不重启策略
固定延时重启策略
重启3次,每次间隔10s
验证默认重启次数:1
验证默认重启间隔:1 s
不重启策略
相关源码
记录部分相关源码
默认策略
入口:DefaultSchedulerFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy(),
jobMasterConfiguration,
jobGraph.isCheckpointingEnabled())
.create();
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
new RestartStrategies.FallbackRestartStrategyConfiguration();
private int numberOfExecutionRetries = -1;
private long executionRetryDelay = DEFAULT_RESTART_DELAY;
private static final long DEFAULT_RESTART_DELAY = 10000L;
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
if (restartStrategyConfiguration
instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(
getNumberOfExecutionRetries(), getExecutionRetryDelay());
} else if (getNumberOfExecutionRetries() == 0) {
return RestartStrategies.noRestart();
} else {
return restartStrategyConfiguration;
}
} else {
return restartStrategyConfiguration;
}
}
public static RestartBackoffTimeStrategy.Factory createRestartBackoffTimeStrategyFactory(
final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration,
final Configuration clusterConfiguration,
final boolean isCheckpointingEnabled) {
checkNotNull(jobRestartStrategyConfiguration);
checkNotNull(clusterConfiguration);
return getJobRestartStrategyFactory(jobRestartStrategyConfiguration)
.orElse(
getClusterRestartStrategyFactory(clusterConfiguration)
.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
}
private static Optional<RestartBackoffTimeStrategy.Factory> getJobRestartStrategyFactory(
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
if (restartStrategyConfiguration instanceof NoRestartStrategyConfiguration) {
return Optional.of(
NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.INSTANCE);
} else if (restartStrategyConfiguration instanceof FixedDelayRestartStrategyConfiguration) {
final FixedDelayRestartStrategyConfiguration fixedDelayConfig =
(FixedDelayRestartStrategyConfiguration) restartStrategyConfiguration;
return Optional.of(
new FixedDelayRestartBackoffTimeStrategy
.FixedDelayRestartBackoffTimeStrategyFactory(
fixedDelayConfig.getRestartAttempts(),
fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
} else if (restartStrategyConfiguration
instanceof FailureRateRestartStrategyConfiguration) {
final FailureRateRestartStrategyConfiguration failureRateConfig =
(FailureRateRestartStrategyConfiguration) restartStrategyConfiguration;
return Optional.of(
new FailureRateRestartBackoffTimeStrategy
.FailureRateRestartBackoffTimeStrategyFactory(
failureRateConfig.getMaxFailureRate(),
failureRateConfig.getFailureInterval().toMilliseconds(),
failureRateConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
} else if (restartStrategyConfiguration instanceof FallbackRestartStrategyConfiguration) {
return Optional.empty();
} else if (restartStrategyConfiguration
instanceof ExponentialDelayRestartStrategyConfiguration) {
final ExponentialDelayRestartStrategyConfiguration exponentialDelayConfig =
(ExponentialDelayRestartStrategyConfiguration) restartStrategyConfiguration;
return Optional.of(
new ExponentialDelayRestartBackoffTimeStrategy
.ExponentialDelayRestartBackoffTimeStrategyFactory(
exponentialDelayConfig.getInitialBackoff().toMilliseconds(),
exponentialDelayConfig.getMaxBackoff().toMilliseconds(),
exponentialDelayConfig.getBackoffMultiplier(),
exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(),
exponentialDelayConfig.getJitterFactor()));
} else {
throw new IllegalArgumentException(
"Unknown restart strategy configuration " + restartStrategyConfiguration + ".");
}
}
默认情况下,没有配置重启策略,对应 FallbackRestartStrategyConfiguration ,返回 Optional.empty(),最终调用 getDefaultRestartStrategyFactory
1 | static final int DEFAULT_RESTART_ATTEMPTS = Integer.MAX_VALUE; |
从上面的 getDefaultRestartStrategyFactory ,就可以知道为啥默认配置时, 不开启 checkpoint 不重启,开启 checkpoint 时,采用固定延时重启策略了。且默认 1s 重启一次,没有上限次数(Integer.MAX_VALUE)。
1 |
|
从上面的源码也可以看出,当配置了重启策略时,都有对应的 RestartStrategyConfiguration
,从而调用 getClusterRestartStrategyFactory
, 而 getClusterRestartStrategyFactory
没有区分是否开启 checkpoint 。所以配置了重启策略,不会区分是否开启 checkpoint,都会按照配置的重启策略,要么都重启要么都不重启。
固定延时策略默认值
对于重启次数:restart-strategy.fixed-delay.attempts。不配置重启策略的默认值(Integer.MAX_VALUE)和配置重启策略的默认值(1)不一样
1 | public static final ConfigOption<Integer> RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS = |