前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
本文为转载文章,原文地址:https://blog.jrwang.me/2019/flink-source-code-streamgraph/ 作者:jrthe42
前言
在研究学习hudi flink源码时发现实际上写Hudi的主要逻辑是在Hudi自定义的StreamOperator和SinkFunction中实现,它们是通过DataStream的transform和addSink调用实现,继续研究发现和Flink的Transformation和StreamOperator有关。那么就需要了解StreamOperator的调用执行逻辑,最后发现这需要了解Flink Task的的运行逻辑,知道Flink的Task或者Function是如何运行的。而这里的逻辑是比较复杂的,大概包含StreamGraph、JobGraph、ExecutionGraph、Physical Graph(虚拟结构)的生成或者构建,还有JobManager和TaskManager的启动,而JobManager又包含ResourceManager、Dispatcher和JobMaster, 这里涉及Java8异步编程如CompletableFuture和基于Akka的RPC通信,最后才是Task的的部署和启动,StreamOperator相关方法的调用最终是通过启动Task.run方法在StreamTask中实现的。
我现在只需要了解主要的调用逻辑,暂时没有精力研究具体的每个步骤的详细源码,正好查阅相关资料时发现了几篇不错的文章,所以转载一下,先从StreamGraph开始。
注意:本篇文章对应的Flink版本比较老了(1.7或1.8),但主要的逻辑一样,可以参考文章和新版Flink源码进行学习,以下为原文
在编写 Flink 的程序的时候,核心的要点是构造出数据处理的拓扑结构,即任务执行逻辑的 DAG。我们先来看一下 Flink 任务的拓扑在逻辑上是怎么保存的。
StreamExecutionEnvironment
StreamExecutionEnvironment 是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类,如 LocalStreamEnvironment, RemoteStreamEnvironment 等。StreamExecutionEnvironment 也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfig 和 CheckpointConfig 中。我们现在先只关注拓扑结构的产生。
通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:
1 | env.addSource(XXX) |
添加数据源后获得 DataStream, 之后通过不同的算子不停地在 DataStream 上实现转换过滤等逻辑,最终将结果输出到 DataSink 中。
在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换。
StreamTransformation
StreamTransformation 代表了生成 DataStream 的操作,每一个 DataStream 的底层都有对应的一个 StreamTransformation。在 DataStream上面通过 map 等算子不断进行转换,就得到了由 StreamTransformation 构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph。
StreamTransformation 在运行时并不一定对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。
每一个 StreamTransformation 都有一个关联的 Id,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism 等信息。
StreamTransformation 有很多具体的子类,如SourceTransformation、 OneInputStreamTransformation、TwoInputTransformation、SideOutputTransformation、 SinkTransformation 等等,这些分别对应了DataStream 上的不同转换操作。
由于 StreamTransformation 中通常保留了其前向的 StreamTransformation,即其输入,因此可以据此还原出 DAG 的拓扑结构。
1 | // OneInputTransformation |
DataStream
一个 DataStream 就表征了由同一种类型元素构成的数据流。通过对 DataStream 应用 map/filter 等操作,可以将一个 DataStream 转换为另一个 DataStream,这个转换的过程就是根据不同的操作生成不同的 StreamTransformation,并将其加入 StreamExecutionEnvironment 的 transformations 列表中。
例如:1
2
3
4
5
6
7
8
9
10
11
12//构造 StreamTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//加入到 StreamExecutionEnvironment 的列表中
getExecutionEnvironment().addOperator(resultTransform);
DataStream 的子类包括 SingleOutputStreamOperator、 DataStreamSource KeyedStream 、IterativeStream, SplitStream(已弃用)。这里要吐槽一下 SingleOutputStreamOperator 的这个类的命名,太容易和 StreamOperator 混淆了。StreamOperator 的介绍见下一小节。
除了 DataStream 及其子类以外,其它的表征数据流的类还有 ConnectedStreams (两个流连接在一起)、 WindowedStream、AllWindowedStream 。这些数据流之间的转换可以参考 Flink 的官方文档。
StreamOperator
在操作 DataStream 的时候,比如 DataStream#map 等,会要求我们提供一个自定义的处理函数。那么这些信息时如何保存在 StreamTransformation 中的呢?这里就要引入一个新的接口 StreamOperator。
StreamOperator 定义了对一个具体的算子的生命周期的管理,包括:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 //生命周期
void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
void open() throws Exception;
void close() throws Exception;
void dispose() throws Exception;
//状态管理
OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation) throws Exception;
void initializeState() throws Exception;
//其它方法暂时省略
StreamOperator 的两个子接口 OneInputStreamOperator 和 TwoInputStreamOperator 则提供了操作数据流中具体元素的方法,而 AbstractUdfStreamOperator 这个抽象子类则提供了自定义处理函数对应的算子的基本实现:
1 | //OneInputStreamOperator |
至于具体到诸如 map/fliter 等操作对应的 StreamOperator,基本都是在 AbstractUdfStreamOperator 的基础上实现的。以 StreamMap 为例:
1 | public class StreamMap<IN, OUT> |
由此,通过 DataStream –> StreamTransformation –> StreamOperator 这样的依赖关系,就可以完成 DataStream 的转换,并且保留数据流和应用在流上的算子之间的关系。
StreamGraph
StreamGraphGenerator 会基于 StreamExecutionEnvironment 的 transformations 列表来生成 StreamGraph。
在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用StreamGraphGenerator#transform方法。对于每一个 StreamTransformation, 确保当前其上游已经完成转换。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge。
1 | Collection<Integer> transformedIds; |
对于不同类型的 StreamTransformation,分别调用对应的转换方法,以最典型的 transformOneInputTransform 为例:
1 | private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) { |
接着看一看 StreamGraph 中对应的添加节点和边的方法:
1 | protected StreamNode addNode(Integer vertexID, |
在 StreamNode 中,保存了对应的 StreamOperator (从 StreamTransformation 得到),并且还引入了变量 jobVertexClass 来表示该节点在 TaskManager 中运行时的实际任务类型。
1 | private final Class<? extends AbstractInvokable> jobVertexClass; |
AbstractInvokable 是所有可以在 TaskManager 中运行的任务的抽象基础类,包括流式任务和批任务。StreamTask 是所有流式任务的基础类,其具体的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask 等。
对于一些不包含物理转换操作的 StreamTransformation,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。
以 PartitionTansformation 为例, PartitionTansformation 是 KeyedStream 对应的转换:
1 | //StreamGraphGenerator#transformPartition |
前面提到,在每一个物理节点的转换上,会调用 StreamGraph#addEdge 在输入节点和当前节点之间建立边的连接:
1 | private void addEdgeInternal(Integer upStreamVertexID, |
这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。
小结
本文简单分析了从 DataStream API 到 StramGraph 的过程。 StreamGraph 是 Flink 任务最接近用户逻辑的 DAG 表示,后面到具体执行的时候还会进行一系列转换,我们在后续的文章中再逐一加以分析。