前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
源码层面总结分析Hudi Clean是如何实现的,不了解Hudi Clean的可以先看这篇:一文彻底理解Apache Hudi的清理服务。
Hudi Clean主要是清理删除不需要的历史文件,可以根据实际业务需要配置参数,不能影响查询,比如某个查询语句正在用某个文件,Clean如果删除了这个文件,查询就会报错。
这里只是删除历史文件,Hudi的文件是有多个版本的,不管配置什么参数,使用什么策略,都不会删除当前最新版本的文件。
Hudi 0.9.0版本有两种清理策略KEEP_LATEST_COMMITS
和KEEP_LATEST_FILE_VERSIONS
,默认为KEEP_LATEST_COMMITS
KEEP_LATEST_COMMITS:简单讲就是根据commit的次数,默认保留最新的10个commit的所有文件,对于10个之前的文件只保留最新版本的文件,历史文件全部删除
KEEP_LATEST_FILE_VERSIONS:简单讲就是保留文件的版本数,默认保留三个版本
具体的可以看上面的那篇公众号文章
目前最新版本0.11.0 添加了一个新的策略KEEP_LATEST_BY_HOURS
:根据小时数清理,默认保留最近24小时的文件,具体实现请查看PR:[HUDI-349] Added new cleaning policy based on number of hours
本文以Hudi 0.9.0 Java Client COW表 进行分析
Insert
HoodieJavaWriteClient->postWrite->postCommit->autoCleanOnCommit
以Insert为入口进行代码跟踪,Hudi源码里有java客户端的代码示例,这里只贴部分主要代码
1 | writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg) |
HoodieJavaWriteClient.insert
1 | public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) { |
在执行完table.insert写完数据后会执行postWrite方法
1 |
|
postWrite方法里又会执行父类 AbstractHoodieWriteClient.postCommit
方法
1 | protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { |
postCommit方法里会调用autoCleanOnCommit()执行清理文件
AbstractHoodieWriteClient.autoCleanOnCommit
autoCleanOnCommit->clean->scheduleTableServiceInternal->HoodieJavaCopyOnWriteTable.clean
首先调用scheduleTableServiceInternal
,该方法会根据清理策略配置参数获取最早的需要保留的instant(earliestInstant
),然后获取需要清理的分区路径列表(partitionsToClean
),再根据分区路径获取需要删除的文件列表,最后将这些信息封装成HoodieCleanerPlan
序列化到新创建的
.clean.requested文件中
再执行HoodieJavaCopyOnWriteTable.clean
,该方法首先获取刚才创建的.clean.requested
文件和其他的之前失败的(如果有的话).clean.inflight
,然后反序列化刚才保存的.clean.requested的文件内容为HoodieCleanerPlan
,然后通过deleteFilesFunc
方法依次删除HoodieCleanerPlan
里的要删除的文件列表并返回HoodieCleanStat
,最后将HoodieCleanStat
作为参数构建HoodieCleanMetadata
,然后将HoodieCleanMetadata
序列化保存到新创建的.clean
文件中,这样整个clean操作就基本完成了。
如何根据清理策略获取要被清理的文件列表,请看后面的部分:获取要删除的文件列表
1 | /** |
前面的只是调用链,下面才到了真正的逻辑
1 | /** |
scheduleTableServiceInternal
1 | private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata, |
HoodieJavaCopyOnWriteTable.scheduleCleaning
1 | public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { |
1 | /** |
1 | /** |
HoodieJavaCopyOnWriteTable.clean
1 | public HoodieCleanMetadata clean(HoodieEngineContext context, |
BaseCleanActionExecutor.execute
1 | public HoodieCleanMetadata execute() { |
1 | private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { |
JavaCleanActionExecutor.clean
1 | List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { |
获取要删除的文件列表
这里和策略配置参数有关,并且逻辑相对复杂一点,就先贴一下入口的代码,先不深入,以后单独总结
返回AbstractHoodieWriteClient.autoCleanOnCommit
1 | Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain(); |
1 | /** |
1 | /** |
1 | /** |