前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
接上篇文章:Hudi源码 | Insert源码分析总结(一)(整体流程),继续进行Apache Hudi Insert源码分析总结,本文主要分析上文提到的WorkloadProfile
版本
Hudi 0.9.0
入口
入口在上篇文章中讲到的BaseJavaCommitActionExecutor
的execute
1 | WorkloadProfile profile = null; |
WorkloadProfile
首先看一下WorkloadProfile
的构造函数,看看需要哪些参数。它有两个构造函数,一个只有一个参数:Pair<HashMap<String, WorkloadStat>, WorkloadStat>
profile,另外一个相比于第一个只是多了一个写操作类型:WriteOperationType。对于profile,它的left是分区统计信息,right是全局统计信息。统计信息是通过WorkloadStat实现的。1
2
3
4
5
6
7
8
9public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
}
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
this.operationType = operationType;
}
buildProfile
buildProfile
就是将inputRecords
构造成WorkloadProfile
所需要的profile
。首先初始化一个分区统计信息partitionPathStatMap
和全局统计信息globalStat
。然后将inputRecords通过map和groupingBy得到每个分区路径对应的文件位置信息和记录数量:partitionLocationCounts。其中文件位置信息是通过record.getCurrentLocation得到的,保存在HoodieRecordLocation中。而record中位置信息是通过上篇文章提到的tag
方法通过读取索引信息得到的。不过tag方法只有upsert/delete等才会调用,对于insert方法是不会触发的,也就是这里的record中的location都为空。然后遍历partitionLocationCounts.entrySet(),其实就是按照分区执行,获取分区路径、记录数、文件位置。如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value即WorkloadStat。接着判断文件信息是否存在,如果文件位置信息存在,则代表是update(有对应的历史数据),对于update,对应的分区下的WorkloadStat调用addUpdates,全局WorkloadStat调用addUpdates。如果文件位置信息不存在,则代表是insert数据(新数据),对于insert,对应分区下的WorkloadStat调用addInserts,使insert数加上对应的记录数,全局WorkloadStat中的insert数也加上对应的记录数,最后返回WorkloadProfile所需要的Pair.of(partitionPathStatMap, globalStat)
1 | protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) { |
WorkloadStat
主要是看一下它的addInserts和addUpdates方法。addInserts方法很简单,就是将numInsert更新为numInserts加上对应的记录数。addUpdates稍微复杂一点,主要是更新updateLocationToCount,updateLocationToCount保存的是(fileId,(instantTime,记录数))。主要逻辑:如果updateLocationToCount中没有该fileId,则直接将fileId,(instantTime,记录数)放进updateLocationToCount,如果有的话,则更新该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()。将value的right即记录数更新为numUpdates + accNumUpdates。其中numUpdates为参数即本次记录数,accNumUpdates是已经存在的累计数量。最后再将numUpdates更新numUpdates+对应的记录数。(numUpdates在后面并没有用到)
总结一下,WorkloadStat的作用主要记录insert累计数和update累计数。不过update需要以fileId为维度进行累计,这是因为update有明确要更新的fileId,而insert是没有的。
1 |
|
saveWorkloadProfileMetadataToInflight
上篇文章讲到将WorkloadProfile元数据信息持久化到.inflight文件中,我们来看一下是如何持久化的。主要逻辑就是遍历profile中的分区,获取对应的WorkloadStat,然后将对应的partitionPath、numInserts/numUpdates、fileId、instantTime放到HoodieWriteStat中,再将HoodieWriteStat放到HoodieCommitMetadata中,最后调用activeTimeline.transitionRequestedToInflight将HoodieCommitMetadata转成json持久化到.inflight
中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
// 按照分区路径遍历
profile.getPartitionPaths().forEach(path -> {
// 获取对应分区的WorkloadStat
WorkloadStat partitionStat = profile.getWorkloadStat(path);
// 创建一个新的HoodieWriteStat,先进行insert
HoodieWriteStat insertStat = new HoodieWriteStat();
// 将WorkloadStat中的numInserts赋值给insertStat
insertStat.setNumInserts(partitionStat.getNumInserts());
// insertStat的fileId为空
insertStat.setFileId("");
// prevCommit为null
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
// 将path和insertStat添加到metadata中
metadata.addWriteStat(path, insertStat);
// 接着进行update的逻辑
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
// 创建一个新的HoodieWriteStat
HoodieWriteStat writeStat = new HoodieWriteStat();
// 设置fileId
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
// prevCommit设为WorkloadStat中的instantTime
writeStat.setPrevCommit(value.getKey());
// 设置更新数
writeStat.setNumUpdateWrites(value.getValue());
// 将path和writeStat添加到metadata中
metadata.addWriteStat(path, writeStat);
});
});
// 设置操作类型
metadata.setOperationType(operationType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
// 将.request转为.inflight,其实就是创建一个新的.inflight,将metadata转成json持久化到.inflight
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
config.shouldAllowMultiWriteOnSameInstant());
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
}
总结
关于WorkloadProfile
的分析一共就这么多,主要是统计record
中每个分区路径对应的insert/upsert数量以及upsert数据对应的fileId
和instantTime
,先持久化到.inflight
文件中,然后给后面的getPartitioner
使用。关于WorkloadProfile
统计的这些信息是如何在getPartitioner
中使用的,我们放在下篇文章中分析。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
相关阅读
- 开源经验分享 | 如何从一名小白成为Apache Hudi Contributor
- Hudi源码 | Insert源码分析总结(一)(整体流程)
- Hudi Java Client总结|读取Hive写Hudi代码示例
- Hudi源码|bootstrap源码分析总结(写Hudi)
- Hudi Clean Policy 清理策略实现分析
- Hudi Clean 清理文件实现分析
- Hudi查询类型/视图总结
- Hudi preCombinedField 总结(二)-源码分析
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark SQL源码学习总结-select(查询)