前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
        前言
总结Hudi Clean Policy清理策略,从源码层面分析如何实现,上一篇文章Hudi Clean 清理文件实现分析从源码层面分析总结了Hudi Clean的整体流程,但是对于和策略有关的获取要删除的文件列表部分没有深入分析,这一篇详细分析KEEP_LATEST_COMMITS策略是如何实现的。
getEarliestCommitToRetain
获取最早需要保留的commitKEEP_LATEST_FILE_VERSIONS: 返回空KEEP_LATEST_COMMITS:如果目前完成状态的commit数小于等于hoodie.cleaner.commits.retained,则返回空,否则,返回倒数第hoodie.cleaner.commits.retained(默认10)个.commit
1  | /**  | 
getPartitionPathsToClean
返回需要清理的分区路径
KEEP_LATEST_FILE_VERSIONS:调用getPartitionPathsForFullCleaning 获取所有的分区路径KEEP_LATEST_COMMITS:当不存在.clean文件时,和KEEP_LATEST_FILE_VERSIONS策略一样调用getPartitionPathsForFullCleaning,当存在时调用getPartitionPathsForIncrementalCleaning获取自上一个clean到现在为止新增的需要清理的分区路径
1  | /**  | 
getPartitionPathsForCleanByCommits
1  | /**  | 
getPartitionPathsForFullCleaning
1  | /**  | 
getPartitionPathsForIncrementalCleaning
用增量模式查找分区路径,过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain([lastEarliestCommitToRetain,thisEarliestCommitToRetain))的commit,这个区间的commit所涉及的分区即为要清理的分区,具体方法为
replaceCommitMetadata.getPartitionToReplaceFileIds和commitMetadata.getPartitionToWriteStats()
1  | /**  | 
getDeletePaths
基于清理策略根据给出的分区路径返回需要被清理的文件列表
KEEP_LATEST_COMMITS: 调用getFilesToCleanKeepingLatestCommitsKEEP_LATEST_FILE_VERSIONS: 调用getFilesToCleanKeepingLatestVersions
1  | /**  | 
getFilesToCleanKeepingLatestCommits
1、获取分区路径
partitionPath下所有的文件组fileGroups(fileGroup是指所有文件fileId相同的为一个文件组)
2、遍历fileGroups,获取每个fileGroup对应的所有的数据(parquet)文件fileSliceList
3、遍历fileSliceList,判断每一个文件,首先过滤掉文件是savepointedFile或者是最新版本或者是小于earliestCommitToRetain的最近一次版本的文件,再将不需要为compaction操作保留并且fileCommitTime < earliestCommitToRetain的文件添加到返回的需要删除的文件列表中deletePaths。也就是clean时不会清理savepointedFile、最新版本和小于earliestCommitToRetain的最近一次版本的文件,还有也不会清理不需要为compaction操作保留的文件,其余的只要文件时间小于earliestCommitToRetain都是我们要返回需要删除的文件
1  | private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {  | 
1  | /**  | 
1  | private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {  | 
1  | /**  | 
1  | private <T1, T2, R> R execute(T1 val, T2 val2, Function2<T1, T2, R> preferredFunction,  | 
1  | 
  | 
getFilesToCleanKeepingLatestVersions
1、遍历fileGroups
2、获取每个fileGroup对应的所有的数据(parquet),过滤掉需要为compaction操作保留的文件:fileSliceIterator
3、如果该fileGroup在PendingCompaction中,保留版本数减一
4、遍历fileSliceIterator,保留对应的版本数的最新文件,其余的都是要删除的文件,添加的返回列表中
1  | /**  | 
总结
相同点
不管是哪种策略,都是先遍历fileGroups,再遍历fileGroup每个对应的FileSlices,每种策略都不删除savepoints和PendingCompaction文件,都保留文件的最新版本
区别
KEEP_LATEST_COMMITS: 保留最新的commit数,除了上述相同点讲的保留的文件,还多保留一个commit,即通过方法getLatestVersionBeforeCommit获取的小于earliestCommitToRetain的最近一次版本,但我觉得没有必要保留这个文件,我已经提了PR: Don’t keep the last commit before the earliest commit to retain,不确定我认为的对不对
对于比较早没有新写入的fileGroup,只保留一个最新版本,因为因为最新版本的文件的时间小于earliestCommitToRetain,那么getLatestVersionBeforeCommit返回的文件就是最新版本的文件
KEEP_LATEST_FILE_VERSIONS: 每个fileGroup都保留相同的版本数