前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
Apache Hudi Insert源码分析总结,以Java Client为例,不了解Hudi Java Client的可以参考:Hudi Java Client总结|读取Hive写Hudi代码示例。
以Java Client为例的原因:1、自己生产上用的Java Client,相比于Spark客户端更熟悉一点。
2、Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
3、Java Client更贴近源码,可以直接分析核心逻辑。不用剖析Spark、Flink源码。对Sprk、Flink源码不熟悉的更容易上手。
4、等分析完Java Client源码后,有时间的话我会再总结一下Spark客户端的源码,这样大家会更容易理解。
版本
Hudi 0.9.0
备注:其实每个版本核心代码都差不多,之所以使用0.9.0,一个是因为对于Java Client,我用0.9.0用的比较多,相比于使用最新版可以节省不少时间,另一个原因是,之前总结的Java Client的源码也是基于0.9.0。比如Hudi Clean Policy 清理策略实现分析和Hudi Clean 清理文件实现分析
initTable
首先是通过initTable初始化Hudi表,可以看出来主要就是根据我们配置的一些参数,创建.hoodie
元数据目录,然后将这些参数持久化到hoodier.properties
文件中,具体的细节可以自己研究。
1 | public HoodieTableMetaClient initTable(Configuration configuration, String basePath) |
HoodieWriteConfig
这里的配置是写数据时使用的配置,上面initTable的配置是持久化文件的配置,当然这俩配置要保持一致(实际上Spark客户端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些参数。熟悉这些参数后就可以进行调优了。
1 | Properties indexProperties = new Properties(); |
HoodieJavaWriteClient
创建writeClient1
HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
startCommit
1 | String newCommitTime = writeClient.startCommit(); |
具体的实现在其父类AbstractHoodieWriteClient
中。首先调用rollbackFailedWrites
执行rollback
操作,关于rollback
分析本文先不讲。然后通过HoodieActiveTimeline.createNewInstantTime()
创建一个新的instantTime。最后创建metaClient
,通过metaClient.getActiveTimeline().createNewInstant生成.commit.request
文件
1 | public String startCommit() { |
generateRecord
主要是构造writeClient写数据所需的数据结构writeRecords:List
client.insert(writeRecords, newCommitTime)
首先获取table,这里返回HoodieJavaCopyOnWriteTable,接着验证一下schema和历史数据的兼容性。然后通过preWrite执行写之前的一些步骤,比如设置操作类型,接着调用table.insert执行完整的写数据操作,返回result。最后调用postWrite执行archive、clean等操作返回WriteStatuses。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
// 首先获取table,这里的table为HoodieJavaCopyOnWriteTable
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
// 验证schema
table.validateUpsertSchema();
// 写之前的一些步骤,比如设置操作类型
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// 调用table.insert执行写数据操作,返回result
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
// 调用postWrite返回WriteStatuses
return postWrite(result, instantTime, table);
}
postWrite
我们先看一下postWrite的逻辑,首先判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean,也就是archive、clean等操作是在写操作完成、生成.commit文件之后进行的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* 判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean
*/
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
// commit是否已经提交,这里主要考虑是否设置了自动提交,hoodie.auto.commit默认true
// 如果不是自动提交的话,那么我们需要手动执行clean等操作,然后手动commit
// 所以这里默认为true
// isCommitted代表着已经生成了.commit文件,也就是写操作成功了,也就是通过table.insert已经完成了整个的写操作
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());
}
// postCommit主要是执行archive、clean等操作。也就是archive、clean等操作是在写操作完成,生成.commit文件之后进行的。
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
}
table.insert
先调用JavaInsertCommitActionExecutor.execute接着调用JavaWriteHelper.newInstance().write1
2
3
4
5
6
7
8
9
10
11public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
return new JavaInsertCommitActionExecutor<>(context, config,
this, instantTime, records).execute();
}
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
JavaWriteHelper.write
它的write方法是在其父类AbstractWriteHelper
中实现的,首先首先判断是否需要去重(通过配置项hoodie.combine.before.insert配置是否需要去重),insert默认不需要去重(upsert/delete默认需要)。如果需要去重的话调用方法combineOnCondition
先进行去重。
然后判断是否需要tag, tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据,对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件。由于这里为insert所以不需要tag,这也是insert和upsert一个比较大的区别。
我们后面分析upsert源码时,会专门分析tag怎么实现的,本文先略过。然后通过调用executor.execute执行写操作,返回result,这里的executor为JavaInsertCommitActionExecutor。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
int shuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
// 如果开启了去重,则先去重,insert默认不去重
// 配置项hoodie.combine.before.insert
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (performTagging) { // 是否需要tag,insert为false
// perform index loop up to get existing location of records
// tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据
// 对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
// 通过调用executor.execute执行写操作,返回result。这里的executor为JavaInsertCommitActionExecutor
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
.withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
+ " writing to storage.");
JavaInsertCommitActionExecutor.execute
JavaInsertCommitActionExecutor.execute实际上调用的父类BaseJavaCommitActionExecutor
的execute
首先通过buildProfile构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用。WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息。数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理,位置信息是为了获取要更新的文件,也就是对应的fileId。对于upsert数据,我们复用原来的fileId。对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写。然后将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight。这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息。然后通过getPartitioner根据WorkloadProfile获取partitioner,接着调用partition方法返回partitionedRecords(<桶号,对应的HoodieRecord>),一个桶对应一个文件 fileId。最后再遍历partitionedRecords,也就是每个桶执行一次写操作handleInsertPartition/handleUpsertPartition,最后调用BoundedInMemoryExecutor.execute,利用生产者消费者模式写数据,关于如何通过生产者消费者模式写数据,我已经在Hudi源码|bootstrap源码分析总结(写Hudi)分析过bootstrap的源码了,原理一样,不同的是实现类不一样,感兴趣的可以看看。
关于tag(索引相关)、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文讲个大概,可能有不准确的地方,大家可以先结合17张图带你彻底理解Hudi Upsert原理进行学习,至于具体的源码分析,限于篇幅及个人精力,本文先不涉及,会放在后面的文章单独讲解,对于本文可能不准确的地方,也会在后面的文章中更新。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) { // 始终为true
// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
// 位置信息是为了获取要更新的文件
// 对于upsert数据,我们复用原来的fileId
// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
try {
// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
}
// 根据WorkloadProfile获取partitioner
final Partitioner partitioner = getPartitioner(profile);
// <桶号,对应的HoodieRecord>,一个桶对应一个文件 fileId
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
List<WriteStatus> writeStatuses = new LinkedList<>();
// forEach,每个桶执行一次写操作handleInsertPartition/handleUpsertPartition
// 最终通过BoundedInMemoryExecutor.execute 生产者消费者模式写数据
partitionedRecords.forEach((partition, records) -> {
// 是否更新、删除
if (WriteOperationType.isChangingRecords(operationType)) {
handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
} else {
handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
}
});
updateIndex(writeStatuses, result);
// commit生成.commit文件,.commit文件的生成标记着写数据的完成
updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
commit
上面通过handleInsertPartition/handleUpsertPartition实际上已经完成了数据的写入。但是最后还要生成.commit元数据文件,代表一次commit的完成,否则如果没有生成commit的话,比如只有.commit.request或者commit.inflight,这样在查询时候不会查到本地写数据生成的文件,而且下次写数据时会触发rollback来处理。
这里索引相关的先不看,commit调用链updateIndexAndCommitIfNeeded
->commitOnAutoCommit
->autoCommit
->commit
->commit
,最终在BaseJavaCommitActionExecutor的commit方法中通过activeTimeline.saveAsComplete生成.commit文件。
前面讲过了,在生成.commit文件后,会调用postWrite方法触发archive、clean等操作。实际上archive、clean等操作的失败,不影响本次写数据的成功。比如clean失败了,可以下次再clean就可以了。所以当commit完成后,如果clean失败了,这样对于有失败机制的集成工具,比如我们使用的Apache NIFI,是不能将本批次数据放进失败队列的。PS:当本次commit不成功时,我们需要放进失败队列,目的是防止数据丢失。其实我们可以自己写代码继承JavaClient类,将postWrite方法和table.insert分开。这样便于判断是写数据失败还是clean失败,以后我会分享相关代码实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
// validate commit action before committing result
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
autoCommit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
commit(extraMetadata, result);
} finally {
this.txnManager.endTransaction();
}
}
// BaseJavaCommitActionExecutor
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
result.setCommitted(true);
result.setWriteStats(writeStats);
// Finalize write
finalizeWrite(instantTime, writeStats, result);
try {
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
// 通过activeTimeline.saveAsComplete生成.commit文件
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
}
handleInsertPartition
附handleInsertPartition
到生产者消费者模式调用链,handleInsertPartition
->handleUpsertPartition
->handleInsert
->JavaLazyInsertIterable.computeNext
->BoundedInMemoryExecutor.execute
1 | protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, |
总结
本文以Java Client为例,对Apache Hudi insert源码进行了整体逻辑的分析总结,希望能对大家有所帮助。由于精力有限,对于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我会在后面的文章再进行总结。并且等insert相关源码分析完后,会再进行upsert的源码分析。可能有些地方不够准确,还请大家多多指正,让我们共同进步。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
相关阅读
- Hudi Java Client总结|读取Hive写Hudi代码示例
- Hudi源码|bootstrap源码分析总结(写Hudi)
- Hudi Clean Policy 清理策略实现分析
- Hudi Clean 清理文件实现分析
- Hudi查询类型/视图总结
- Hudi preCombinedField 总结(二)-源码分析
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark SQL源码学习总结-select(查询)