前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
接上篇文章和之前的总结的源码文章,本文总结源码 tag/tagLocation ,对应功能:根据索引信息判断记录是否存在,如果不存在,代表是新增数据,如果记录存在则代表是更新数据,需要找到并设置 currentLocation。
tag
AbstractWriteHelper.tag
1 | private I tag( |
table.getIndex()
对于 Java Client 这里 table 为 HoodieJavaCopyOnWriteTable , HoodieJavaCopyOnWriteTable.getIndex() -> HoodieJavaTable.getIndex
1 | protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { |
因为指定了索引类型为 BLOOM , 所以这里返回 JavaHoodieBloomIndex 。
JavaHoodieBloomIndex
JavaHoodieBloomIndex 的父类为 HoodieBaseBloomIndex ,其 tagLocation 在父类 HoodieBaseBloomIndex
1 | public class JavaHoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieBaseBloomIndex<T> { |
tagLocation
- 提取映射:
Map(partitionPath, List<recordKey>)
- lookupIndex :根据索引查找每个 recordKey 的 location,返回 每个 recordKey 和 location 的对应关系:
Map<HoodieKey, HoodieRecordLocation>
- tagLocationBacktoRecords :根据 lookupIndex 返回的 recordKey 和 location 的对应关系对应关系
keyFilenamePair
,为每个 HoodieRecord 设置 currentLocation 。
1 |
|
lookupIndex
查找每个 recordKey 的 location,并返回已存在的所有 recordKey 和 location 的映射: Map<HoodieKey, HoodieRecordLocation>
,如果不存在,则删除记录键。
- 在传入记录中获取每个分区的记录数 : recordsPerPartition
- loadInvolvedFiles : 将所有涉及的文件加载为<Partition, filename>对
- explodeRecordsWithFileComparisons :利用区间树根据最大值最小值查找每个 HoodieKey 可能存在于哪个文件,返回:
List<Pair<fileId, HoodieKey>>
,这里多个 fileId 对应一个 HoodieKey ,一个 fileId 对应多个HoodieKey,类似于笛卡尔积,多对多的关系,按照 fileId 排序 - findMatchingFilesForRecordKeys :遍历 explodeRecordsWithFileComparisons 返回的
List<Pair<fileId, HoodieKey>>
,以 fileId 为维度,利用布隆索引判断有哪些 HoodieKey 可能存在于该文件中,并添加到候选:candidateRecordKeys ,最后遍历 candidateRecordKeys ,去 parquet 数据文件中确认该 key 是否确实存在于该文件,最后返回确切的 recordKey 和 location 的映射关系 :Map<HoodieKey, HoodieRecordLocation>
1 | /** |
loadInvolvedFiles
将所有涉及的文件加载为 List<Pair<partitionPath, BloomIndexFileInfo>>
1 | /** |
Interval Tree
Interval Tree : 区间树 ,翻译软件一般翻译为间隔树。
- 百度百科:区间树是在平衡树基础上进行扩展得到的支持以区间为元素的动态集合的操作,其中每个节点的关键值是区间的左端点。
- 博客:区间树是在红黑树基础上进行扩展得到的支持以区间为元素的动态集合的操作,其中每个节点的关键值是区间的左端点。通过建立这种特定的结构,可是使区间的元素的查找和插入都可以在O(lgn)的时间内完成。相比于基础的红黑树数据结构,增加了一个max[x],即以x为根的子树中所有区间的断点的最大值
- 请注意:区间树和线段树不一样,线段树是一种特殊的区间树。区间树:Interval Tree , 线段树: Segment Tree 。网上有很多博客将区间树和线段树归为一种。
- 线段树:Segment Tree ,线段树是一种二叉搜索树,与区间树相似,它将一个区间划分成一些单元区间,每个单元区间对应线段树中的一个叶结点。使用线段树可以快速的查找某一个节点在若干条线段中出现的次数,时间复杂度为O(logN)。而未优化的空间复杂度为2N,实际应用时一般还要开4N的数组以免越界,因此有时需要离散化让空间压缩。
explodeRecordsWithFileComparisons
- 首先判断是否需要使用区间树基于最小和最大记录键值进行过滤,默认为true,则创建 IntervalTreeBasedIndexFileFilter 。
- IntervalTreeBasedIndexFileFilter :基于区间树的索引查找。为每个分区构建一个{@link KeyRangeLookupTree},并使用它来搜索需要查找的任何给定recordKey的匹配索引文件。
- 主要逻辑:利用区间树根据最大值最小值,返回可能包含 recordKey 的文件列表。利用区间树的原因主要是可以降低查询时间。
- 查询逻辑:
- 对于有最大值和最小值的文件,如果该 recordKey 在最大值最小之区间内,则认为该文件可能包含 recordKey
- 对于没有最大值和最小值的文件,则认为该文件可能包含 recordKey
- 返回值:List(fileId, HoodieKey) , 多个 fileId 对应一个 HoodieKey
1 | /** |
IntervalTreeBasedIndexFileFilter
基于区间树的索引查找。为每个分区构建一个{@link KeyRangeLookupTree},并使用它来搜索需要查找的任何给定recordKey的匹配索引文件。
- KeyRangeLookupTree: 基于区间树实现的查找树,查询任意给定 Key 的时间复杂度为 (N logN)
- 对于有有最大值最小值的文件,构造为区间树:KeyRangeLookupTree
- 对于没有最大值最小值的文件,将 fileId 添加到 partitionToFilesWithNoRanges
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// 请注意,区间树实现没有自动平衡来确保logN搜索时间。
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be
// skewed which could result in N search time instead of logN.
// 所以,我们在这里打乱输入,希望树不会有任何倾斜。否则,树可能会倾斜,这可能导致搜索时间是N而不是logN。
Collections.shuffle(bloomIndexFiles);
KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
bloomIndexFiles.forEach(indexFileInfo -> {
if (indexFileInfo.hasKeyRanges()) { // 如果有最大值最小值
// 将 最大值,最小值,fileId 插入到 lookUpTree
// 构造间隔数
lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
indexFileInfo.getFileId()));
} else {
if (!partitionToFilesWithNoRanges.containsKey(partition)) {
partitionToFilesWithNoRanges.put(partition, new HashSet<>());
}
// 将没有最大值最小值的 fileId 添加到 partitionToFilesWithNoRanges
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
}
});
partitionToFileIndexLookUpTree.put(partition, lookUpTree);
});
}
lookUpTree.insert
区间树的具体构造逻辑。
1 | void insert(KeyRangeNode newNode) { |
getMatchingFilesAndPartition
- 对于有最大值最小值的文件,利用区间树 KeyRangeLookupTree 查找可能包含该 recordKey 的 fileId 列表 。
- 根据文件的最大最小值判断,如果 recordKey 在最大值最小值区间,则可能存在该文件中
- 如果不在最大值最小值区间,则不存在该文件中
- 利用区间树的原因主要是可以降低查询时间。
- 对于没有最大值最小值的文件,则认为都可能存在该 recordKey ,所以全部返回
- 返回值 Set(Pair(partitionPath, fileId))
1 |
|
findMatchingFilesForRecordKeys
找出<RowKey,filename>
对。
1 | /** |
LazyKeyCheckIterator.computeNext
1 | protected List<KeyLookupResult> computeNext() { |
createNewFileReader().readBloomFilter()
1 | this.bloomFilter = createNewFileReader().readBloomFilter(); |
keyLookupHandle.addKey
1 | public void addKey(String recordKey) { |
getLookupResult
在所有添加的键中,返回在文件组中实际找到的键的列表。
1 | /** |
tagLocationBacktoRecords
在第二步已经通过 lookupIndex 获取的 HoodieKey 和 HoodieRecordLocation 的对应关系,tagLocationBacktoRecords 就是根据 lookupIndex 返回的对应关系 keyFilenamePair ,为每个 HoodieRecord 设置 currentLocation 。
有的 HoodieRecord 可能没有对应的 fileId,所以也就不会设置 currentLocation 。
1 | /** |
总结
- tag/tagLocation :根据索引信息判断记录是否存在,如果不存在,代表是新增数据,如果记录存在则代表是更新数据,需要找到并设置 currentLocation。
- tag : table.getIndex().tagLocation -> JavaHoodieBloomIndex.tagLocation -> HoodieBaseBloomIndex.tagLocation
- tagLocation 会利用上篇文章讲的写到 parquet 文件中的 最大值最小值和布隆索引
- 最大值最小值用在第一阶段的过滤:构造区间树 (Interval Tree),利用区间树查找 每个 recordKey 可能存在于哪些文件中,利用区间树的有点在于可以加速查找,时间复杂度为 O(logN)。
- 对于有最大值和最小值的文件,如果该 recordKey 在最大值最小之区间内,则认为该文件可能包含 recordKey
- 对于没有最大值和最小值的文件,则认为该文件可能包含 recordKey
- 所以这里返回的是多对多的关系,类似于笛卡尔积。即一个文件可能保存多个 recordKey ,一个 recordKey 可能存在于多个文件中。
- 所以对于 recordKey 有明确的顺序关系的(例如:时间戳作为前缀),要比较的文件数量会因范围修剪而大大减少。这样不仅可以加速查找时间,还会提高查询精确度,也就是返回的 recordKey 和 location 的关系数量会少许多。
- 对于像字符串这种没有顺序关系的(hash值) recordKey ,会导致每个文件的最大值最小值区间范围都会比较大,这样 recordKey 就会可能存在于多个文件中,导致返回的对应关系特别大,不仅影响区间树的查询效率,还会影响后面的遍历性能。而且这种对应关系也会占比较大的内存,比如
本批次recordKey 的数量有 10万 ,文件数有 1000个,那个最后返回的 List(fileId, HoodieKey) 的数量可能会达到 10w * 1000 = 1亿个,这是最坏的情况,但是一般情况下也会达到几千万个。 - 所以对于没有顺序关系的 recordKey ,我们可能禁用第一阶段的利用区间树过滤,效率可能会更好一些。相关参数 :hoodie.bloom.index.use.treebased.filter = false , hoodie.bloom.index.prune.by.ranges = false
- 布隆过滤器用在第二阶段的过滤,遍历第一阶段返回的 List(fileId, HoodieKey) ,利用从parquet文件中反序列化的来的布隆过滤器进行二次过滤,判断哪些 HoodieKey 有可能存在于该 fileId 中,如果可能存在则添加到候选:candidateRecordKeys
- 布隆过滤器的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。所以只能判断有可能存在,还需要去和实际的数据文件去对比,进一步确认是否确实存在于该文件中。
- 然后遍历 candidateRecordKeys ,去遍历每个parquet数据文件,和 parquet 文件中的 key 进行实际的比较,对于确实存在于该文件中的,返回实际的 HoodieKey 和 Location 的对应关系:
Map<HoodieKey, HoodieRecordLocation>
- 最后根据上一步返回的 HoodieKey 和 Location 的对应关系,为每个 HoodieRecord 设置 currentLocation ,有的 HoodieRecord 没有对应的 fileId ,所以不需要设置 currentLocation。
- 无论是区间树查询最大值最小值,还是反序列化布隆过滤,都仅涉及读取文件页脚,所以读取成本较低
- 随着表数据量的增加、数据文件数的增加,会导致两个问题,从而使索引性能越来越差。
- 占用内存增加:前面提到有笛卡尔积,文件数越多,笛卡尔积越大,从而占用内存也会增加,遍历耗时也会增加。另外布隆过滤器 BitSet 也会随着每个文件的 recordKey 的数量的增加越来越大,从而导致布隆过滤器占用的内存也越来越大。
- 我们在利用区间树和布隆过滤器过滤完每个key可能存在于哪些文件中之后,会使用筛选后的键和关联的base文件(这里为parquet) 执行实际的文件查找 。因为这里要加载所有涉及的整个parquet文件内容,随着文件数量的增大和文件大小的增加,都会导致遍历查询这些parquet文件的时间越来越长,从而导致索引性能越来越差。