前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结Hudi Clean Policy清理策略,从源码层面分析如何实现,上一篇文章Hudi Clean 清理文件实现分析从源码层面分析总结了Hudi Clean的整体流程,但是对于和策略有关的获取要删除的文件列表部分没有深入分析,这一篇详细分析KEEP_LATEST_COMMITS
策略是如何实现的。
getEarliestCommitToRetain
获取最早需要保留的commitKEEP_LATEST_FILE_VERSIONS
: 返回空KEEP_LATEST_COMMITS
:如果目前完成状态的commit数小于等于hoodie.cleaner.commits.retained
,则返回空,否则,返回倒数第hoodie.cleaner.commits.retained
(默认10)个.commit
1 | /** |
getPartitionPathsToClean
返回需要清理的分区路径
KEEP_LATEST_FILE_VERSIONS
:调用getPartitionPathsForFullCleaning
获取所有的分区路径KEEP_LATEST_COMMITS
:当不存在.clean文件时,和KEEP_LATEST_FILE_VERSIONS
策略一样调用getPartitionPathsForFullCleaning
,当存在时调用getPartitionPathsForIncrementalCleaning获取自上一个clean到现在为止新增的需要清理的分区路径
1 | /** |
getPartitionPathsForCleanByCommits
1 | /** |
getPartitionPathsForFullCleaning
1 | /** |
getPartitionPathsForIncrementalCleaning
用增量模式查找分区路径,过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain([lastEarliestCommitToRetain,thisEarliestCommitToRetain))的commit,这个区间的commit所涉及的分区即为要清理的分区,具体方法为
replaceCommitMetadata.getPartitionToReplaceFileIds
和commitMetadata.getPartitionToWriteStats()
1 | /** |
getDeletePaths
基于清理策略根据给出的分区路径返回需要被清理的文件列表
KEEP_LATEST_COMMITS
: 调用getFilesToCleanKeepingLatestCommits
KEEP_LATEST_FILE_VERSIONS
: 调用getFilesToCleanKeepingLatestVersions
1 | /** |
getFilesToCleanKeepingLatestCommits
1、获取分区路径
partitionPath
下所有的文件组fileGroups
(fileGroup
是指所有文件fileId相同的为一个文件组)
2、遍历fileGroups
,获取每个fileGroup
对应的所有的数据(parquet)文件fileSliceList
3、遍历fileSliceList
,判断每一个文件,首先过滤掉文件是savepointedFile
或者是最新版本或者是小于earliestCommitToRetain
的最近一次版本的文件,再将不需要为compaction
操作保留并且fileCommitTime < earliestCommitToRetain
的文件添加到返回的需要删除的文件列表中deletePaths
。也就是clean时不会清理savepointedFile
、最新版本和小于earliestCommitToRetain
的最近一次版本的文件,还有也不会清理不需要为compaction
操作保留的文件,其余的只要文件时间小于earliestCommitToRetain
都是我们要返回需要删除的文件
1 | private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) { |
1 | /** |
1 | private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) { |
1 | /** |
1 | private <T1, T2, R> R execute(T1 val, T2 val2, Function2<T1, T2, R> preferredFunction, |
1 |
|
getFilesToCleanKeepingLatestVersions
1、遍历fileGroups
2、获取每个fileGroup
对应的所有的数据(parquet),过滤掉需要为compaction
操作保留的文件:fileSliceIterator
3、如果该fileGroup
在PendingCompaction
中,保留版本数减一
4、遍历fileSliceIterator
,保留对应的版本数的最新文件,其余的都是要删除的文件,添加的返回列表中
1 | /** |
总结
相同点
不管是哪种策略,都是先遍历fileGroups
,再遍历fileGroup
每个对应的FileSlices
,每种策略都不删除savepoints
和PendingCompaction
文件,都保留文件的最新版本
区别
KEEP_LATEST_COMMITS
: 保留最新的commit数,除了上述相同点讲的保留的文件,还多保留一个commit,即通过方法getLatestVersionBeforeCommit
获取的小于earliestCommitToRetain的最近一次版本,但我觉得没有必要保留这个文件,我已经提了PR: Don’t keep the last commit before the earliest commit to retain,不确定我认为的对不对
对于比较早没有新写入的fileGroup
,只保留一个最新版本,因为因为最新版本的文件的时间小于earliestCommitToRetain,那么getLatestVersionBeforeCommit
返回的文件就是最新版本的文件
KEEP_LATEST_FILE_VERSIONS
: 每个fileGroup
都保留相同的版本数