前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
之前在Flink Hudi DataStream API代码示例中总结了Hudi Flink DataStream API的代码及本地调试,并且在文中提到其实大家用Table API更多一些,但是我感觉Table API调试源码可能会比较难一点,因为可能会涉及到SQL解析,不清楚Table API的入口在哪里。
但是在我总结的上篇文章Flink用户自定义连接器(Table API Connectors)学习总结中知道了其实Flink Table API读写Hudi是通过自定义实现了一个Hudi的Table API Connectors(’connector’ = ‘hudi’),相关类为HoodieTableFactory
、HoodieTableSink
和HoodieTableSource
,入口为HoodieTableFactory
。
代码
GitHub地址:https://github.com/dongkelun/hudi-demo/tree/master/hudi0.13_flink1.15
1 | package com.dkl.hudi.flink; |
其实就是通过tableEnv.executeSql执行Flink SQL,其中创建Hudi表的SQL语句是利用Hudi源码中模块hudi-examples-flink
里面的sql方法生成的,它会根据参数返回对应的创建Hudi表的SQL语句,示例:1
2
3
4
5
6
7
8String tableName = "t1";
String tablePath = "/tmp/flink/hudi/" + tableName;
String hoodieTableDDL = sql(tableName)
.option(FlinkOptions.PATH, tablePath)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE")
.end();
System.out.println(hoodieTableDDL);
输出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16create table t1(
`uuid` VARCHAR(20),
`name` VARCHAR(10),
`age` INT,
`ts` TIMESTAMP(3),
`partition` VARCHAR(10),
PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
with (
'connector' = 'hudi',
'read.streaming.enabled' = 'true',
'path' = '/tmp/flink/hudi/t1',
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE'
)
如果不指定字段的话,会有官方文档示例中几个默认字段,默认主键为uuid,默认为分区表,分区字段为partition
1 | hoodieTableDDL = sql(tableName) |
可以通过field方法指定表的字段,pkField指定表的主键,noPartition设置表为非分区表,partitionField指定表的分区字段
1 | create table t1( |
本地运行调试
运行结果
调试
我们根据开头提到的文章Flink用户自定义连接器(Table API Connectors)学习总结可知入口为HoodieTableFactory
,其中sink的入口为createDynamicTableSink
,source的入口为createDynamicTableSource
本地调试源码的时候可能会遇到在Idea中源码下载不下来的情况,我们可以直接去官网下载对应的源码jar包,然后放到自己本地的仓库中,方便我们调试的时候阅读源码。比如hudi-flink对应的源码的下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink/0.13.0/hudi-flink-0.13.0-sources.jar,然后将hudi-flink-0.13.0-sources.jar放到路径 m2\repository\org\apache\hudi\hudi-flink\0.13.0 中就可以了
我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在
HoodiePipeline
的sink
和source
方法里,而这两个方法也是分别调用了HoodieTableFactory
的createDynamicTableSink
和createDynamicTableSource
1 | /** |
源码调试
我们还可以直接在源码里进行调试,这样方便我们直接看到修改源码后的效果。直接在源码里进行调试可能配置环境会比较麻烦一点,每个版本也不太一样。比如我在hudi 0.13.0源码中模块hudi-examples-flink
里进行调试,需要做如下修改:
详细的pom修改可以看我提到github上的commit:https://github.com/dongkelun/hudi/commit/558910d4cab189d0cbfa9c69332f3e4e74e56b41
这样我们就可以本地直接运行源码中的类:HoodieFlinkQuickstart
,也可以直接修改源码查看效果:
源码里的demo和测试用例比较全,我们可以多看一看多调试一下,利于提升对源码的理解。