前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。
版本
- Flink 1.15.4
- Hudi 0.13.0
目标
在文章Hudi Flink SQL代码示例及本地调试中提到:我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipeline
的sink
和source
方法里,而这两个方法也是分别调用了HoodieTableFactory
的createDynamicTableSink
和createDynamicTableSource
。那么Table API的代码怎么一步一步走到createDynamicTableSink
和createDynamicTableSource
的呢?返回HoodieTableSink
之后又是怎么写数据的?因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider
的方法体里,这些问题之前都没有搞清楚,所以这次的目标就是要搞清楚:1、Table API 的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤; 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体进行后面的写Hudi逻辑的
相关类:
HoodiePipeline
(DataStream API)HoodieTableFactory
HoodieTableSink
DataStreamSinkProviderAdapter
(函数式接口)TableEnvironmentImpl
BatchPlanner
PlannerBase
FactoryUtil
BatchExecSink
CommonExecSink
DataStream API
其实上面的问题在DataStream API代码里很容易看出来,我们先看一下DataStream API写Hudi的代码,详细代码在文章:Flink Hudi DataStream API代码示例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16DataStream<RowData> dataStream = env.fromElements(
GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("id int")
.column("name string")
.column("price double")
.column("ts bigint")
.column("dt string")
.pk("id")
.partition("dt")
.options(options);
builder.sink(dataStream, false);
HoodiePipeline.Builder.sink
1
2
3
4public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
TableDescriptor tableDescriptor = getTableDescriptor();
return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
}
HoodiePipeline.sink
1
2
3
4
5
6
7private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
.consumeDataStream(input);
}
在HoodiePipeline.sink
就可以找到答案:
1、HoodieTableFactory
.createDynamicTableSink
返回HoodieTableSink
2、HoodieTableSink
.getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
3、DataStreamSinkProviderAdapter
.consumeDataStream
调用HoodieTableSink
.getSinkRuntimeProvider
中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStream<RowData>
HoodieTableSink
.getSinkRuntimeProvider
getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
,其中Lambda 表达式dataStream -> {}
为DataStreamSinkProviderAdapter
.consumeDataStream(dataStream)
的具体实现
1 |
|
DataStreamSinkProviderAdapter
其实是一个函数式接口,它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口,从而实现接口的实例化
1 | public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider { |
函数式接口和Lambda 表达式参考下面两篇文章:
https://it.sohu.com/a/682888110_100123073
https://blog.csdn.net/Speechless_/article/details/123746047
Table API
知道了 DataStream API 调用步骤后,来对比看一下 Table API 的大致调用步骤,调试代码入口。
1 | tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName)); |
整体调用流程
1、tableEnv.executeSql
->TableEnvironmentImpl.executeSql
->executeInternal(Operation operation)
->executeInternal(List<ModifyOperation> operations)
->this.translate
->(PlannerBase)this.planner.translate
2.1、PlannerBase.translate
->PlannerBase.translateToRel
->getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)
->FactoryUtil.createDynamicTableSink
->HoodieTableFactory.createDynamicTableSink
2.2、PlannerBase.translate
->(BatchPlanner)translateToPlan(execGraph)
->(ExecNodeBase)node.translateToPlan
->(BatchExecSink)translateToPlanInternal
->(CommonExecSink)createSinkTransformation
->(HoodieTableSink)getSinkRuntimeProvider
->(CommonExecSink)applySinkProvider
->provider.consumeDataStream
具体代码
TableEnvironmentImpl
(TableEnvironmentImpl)executeSql
1
2
3
4
5
6
7
8
9public TableResult executeSql(String statement) {
List<Operation> operations = this.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.");
} else {
// 关键步骤:executeInternal
return this.executeInternal((Operation)operations.get(0));
}
}
executeInternal(Operation operation)
1
2
3
4
5
6public TableResultInternal executeInternal(Operation operation) {
if (operation instanceof ModifyOperation) {
// 关键步骤:executeInternal
return this.executeInternal(Collections.singletonList((ModifyOperation)operation));
} else if (operation instanceof StatementSetOperation) {
return this.executeInternal(((StatementSetOperation)operation).getOperations());
executeInternal(List<ModifyOperation> operations)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public TableResultInternal executeInternal(List<ModifyOperation> operations) {
// 关键步骤:translate
List<Transformation<?>> transformations = this.translate(operations);
List<String> sinkIdentifierNames = this.extractSinkIdentifierNames(operations);
TableResultInternal result = this.executeInternal(transformations, sinkIdentifierNames);
if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {
try {
result.await();
} catch (ExecutionException | InterruptedException var6) {
result.getJobClient().ifPresent(JobClient::cancel);
throw new TableException("Fail to wait execution finish.", var6);
}
}
return result;
}
translate
这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode() (如果为streaming模式,那么这里的planner为StreamPlanner,具体的实现可以看StreamTableEnvironmentImpl.create
)1
2
3
4
5protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
// 这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
// 关键步骤:PlannerBase.translate
return this.planner.translate(modifyOperations);
}
BatchPlanner
(BatchPlanner的父类)PlannerBase.translate
(如果为streaming模式,那么这里的planner为StreamPlanner,两者父类一样,这里都是走的父类的方法)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
beforeTranslation()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// 关键步骤:translateToRel
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
// 关键步骤:translateToPlan
val transformations = translateToPlan(execGraph)
afterTranslation()
transformations
}
PlannerBase.translateToRel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
val dataTypeFactory = catalogManager.getDataTypeFactory
modifyOperation match {
case s: UnregisteredSinkModifyOperation[_] =>
val input = getRelBuilder.queryOperation(s.getChild).build()
val sinkSchema = s.getSink.getTableSchema
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
null,
dataTypeFactory,
getTypeFactory)
LogicalLegacySink.create(
query,
s.getSink,
"UnregisteredSink",
ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
case collectModifyOperation: CollectModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
DynamicSinkUtils.convertCollectToRel(
getRelBuilder,
input,
collectModifyOperation,
getTableConfig,
getFlinkContext.getClassLoader
)
case catalogSink: SinkModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
val dynamicOptions = catalogSink.getDynamicOptions
// 关键步骤:getTableSink
getTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {
case (table, sink: TableSink[_]) =>
// Legacy tables can't be anonymous
val identifier = catalogSink.getContextResolvedTable.getIdentifier
// check the logical field type and physical field type are compatible
val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
// validate logical schema and physical schema are compatible
validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
// validate TableSink
validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
table.getResolvedSchema,
identifier.asSummaryString,
dataTypeFactory,
getTypeFactory)
val hints = new util.ArrayList[RelHint]
if (!dynamicOptions.isEmpty) {
hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build)
}
LogicalLegacySink.create(
query,
hints,
sink,
identifier.toString,
table,
catalogSink.getStaticPartitions.toMap)
case (table, sink: DynamicTableSink) =>
DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)
} match {
case Some(sinkRel) => sinkRel
case None =>
throw new TableException(
s"Sink '${catalogSink.getContextResolvedTable}' does not exists")
}
PlannerBase.getTableSink
1 | private def getTableSink( |
FactoryUtil.createDynamicTableSink
根据’connector’=’hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory,接着调用
HoodieTableFactory.createDynamicTableSink
1 | public static DynamicTableSink createDynamicTableSink( |
HoodieTableFactory.createDynamicTableSink
第一个问题解决
1 | public DynamicTableSink createDynamicTableSink(Context context) { |
BatchExecSink
回到方法PlannerBase.translate
,它会在后面调用translateToPlan
。execGraph.getRootNodes
返回的内容为BatchExecSink
(想知道为啥是BatchExecSink
,可以看PlannerBase.translate
中调用的translateToExecNodeGraph
方法)(如果为streaming模式,那么execGraph.getRootNodes
返回StreamExecSink
),BatchExecSink
是BatchExecNode
的子类,所以会执行node.translateToPlan
PlannerBase.translateToPlan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()
val transformations = execGraph.getRootNodes.map {
// BatchExecSink
// 关键步骤:ExecNodeBase.translateToPlan
case node: BatchExecNode[_] => node.translateToPlan(planner)
case _ =>
throw new TableException(
"Cannot generate BoundedStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
afterTranslation()
transformations
}
BatchExecSink
1
2
3
4
5public class BatchExecSink extends CommonExecSink implements BatchExecNode<Object> {
...
public abstract class CommonExecSink extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
...
ExecNodeBase.translateToPlan
1 | public final Transformation<T> translateToPlan(Planner planner) { |
BatchExecSink.translateToPlanInternal
(只有这里区分batch和streaming模式,如果为streaming模式,这里走StreamExecSink.translateToPlanInternal
,其他两种模式都一样走的父类的逻辑)
1 | protected Transformation<Object> translateToPlanInternal( |
CommonExecSink.createSinkTransformation
这里的tableSink为HoodieTableSink,会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider(没有执行里面的方法体)
1 |
|
CommonExecSink.applySinkProvider
先通过new DataStream<>(env, sinkTransformation)生成dataStream,接着通过执行
provider.consumeDataStream
调用HoodieTableSink.getSinkRuntimeProvider
中的方法体,这里的provider为HoodieTableSink.getSinkRuntimeProvider
返回的DataStreamSinkProviderAdapter
1 | private Transformation<?> applySinkProvider( |
provider.consumeDataStream
(已经在上面的类DataStreamSinkProviderAdapter
提过)
它会调用
HoodieTableSink.getSinkRuntimeProvider
中的方法体(Lambda 表达式)执行后面的写hudi逻辑
第二个问题解决
1 | default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) { |
总结
本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程,并没有对源码进行深入的分析(自己水平也不够)。主要目的是为了弄清楚从Table API
的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体以进行后面的写Hudi逻辑,这样便于后面对Hudi源码的分析和学习。
本文新学习知识点:函数式接口以及对应的 Lambda 表达式的实现