前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
补充上一篇文章Hudi Spark源码学习总结-spark.read.format(“hudi”).load,由于上篇文章篇幅已经比较长了,所以单独写一篇补充一下,没有读过的可以先阅读一下,我们在上篇文章讲到resolveBaseFileOnlyRelation返回的是HadoopFsRelation,那么如果返回BaseFileOnlyRelation呢?
起因
其实除了HadoopFsRelation、BaseFileOnlyRelation还有IncrementalRelation、MergeOnReadSnapshotRelation、MergeOnReadIncrementalRelation、HoodieBootstrapRelation,之所有要单独看一下BaseFileOnlyRelation,是因为我们从提交历史上可以看出,一开始默认的就是HadoopFsRelation后来添加了自定义的BaseFileOnlyRelation,然后现在回退成了HadoopFsRelation,查看对应PR了解了一下原因
添加BaseFileOnlyRelation的PR:[HUDI-3338] custom relation instead of HadoopFsRelation,原因是:目前,HadoopFsRelation将使用实际分区路径的值作为分区字段的值。但是,与普通表不同,Hudi在Parquet文件中保留分区值。在某些情况下,实际分区路径的值和分区字段的值是不同的。因此,这里实现了BaseFileOnlyViewRelation,通过它,Hudi可以管理自己的关系。
回退为HadoopFsRelation的PR:[HUDI-3902] Fallback to HadoopFsRelation in cases non-involving Schema Evolution,原因是:Spark的一些优化规则(以及一些其他处理)是基于HadoopFsRelation的使用,这导致当我们依赖自定义Relation时,这些优化没有得到应用。我们可以在[HUDI-3338]的PR的下面可以看到,有commiter提出,使用BaseFileOnlyRelation后查询性能相比于之前的HadoopFsRelation下降了大约一倍,这是因为Spark仅在少数地方做了查询优化,其中一个地方就是我们在上篇文章中讲到的FileSourceStrategy,我们看到[HUDI-3902]虽然回退到HadoopFsRelation,但是貌似并没有处理分区路径值的问题,所以后面又提了一个PR:
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns,这个PR添加了自定义FileFormat:Spark24HoodieParquetFileFormat,它重写了ParquetFileFormat的buildReaderWithPartitionValues方法。其中的关键点是添加了参数shouldAppendPartitionValues来判断是否将分区路径中的值添加为分区字段的值。
1 | if (shouldAppendPartitionValues) { |
BaseFileOnlyRelation
那么接下来看一下BaseFileOnlyRelation后面的处理逻辑吧,为了方便调试,我们直接把resolveBaseFileOnlyRelation的返回值改为BaseFileOnlyRelation
qe.executedPlan
我们先看一下qe.executedPlan的返回值是啥吧,对比一下和HadoopFsRelation不同,这样我们只需要分析不同点就可以了
1 | CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,RowDataSourceScanExec()))) |
其中RowDataSourceScanExec的Relation为BaseFileOnlyRelation,rdd为Spark2HoodieFileScanRDD,这两个都是和Hudi相关,我们对比HadoopFsRelation发现由之前的FileSourceScanExec改成了RowDataSourceScanExec,除了这一点,其他的都一样,那么我们需要是分析一下是在哪里返回RowDataSourceScanExec的。
DataSourceStrategy
它会在DataSourceStrategy匹配到第二个,因为BaseFileOnlyRelation是PrunedFilteredScan的子类,可以看一下它的定义
1 | class BaseFileOnlyRelation(sqlContext: SQLContext, |
1 | case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with CastSupport { |
pruneFilterProject
1 | private def pruneFilterProject( |
pruneFilterProjectRaw
1 | private def pruneFilterProjectRaw( |
在这方法里我们看到它会构造RowDataSourceScanExec并返回,其中它的relation参数为BaseFileOnlyRelation,而它的rdd是通过调用HoodieBaseRelation重写buildScan方法返回的RDD[Row],值为Spark2HoodieFileScanRDD,这正是查询Hudi表数据的核心逻辑
buildScan
1 | override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { |
collectFromPlan
我们在上篇文章中讲过这个方法最终会调用RowDataSourceScanExec的inputRDDs
RowDataSourceScanExec.inputRDDs
1 | override def inputRDDs(): Seq[RDD[InternalRow]] = { |
直接返回rdd,rdd是通过HoodieBaseRelation.buildScan生成的Spark2HoodieFileScanRDD
总结
本文总结了使用HadoopFsRelation和BaseFileOnlyRelation的原因以及使用BaseFileOnlyRelation时查询Hudi的逻辑,知道了在使用BaseFileOnlyRelation是通过buildScan实现查询的。
我在文章https://shzhangji.com/cnblogs/2018/12/09/spark-datasource-api-v2/学习
Spark DataSource时,了解到Spark DataSource V1是通过调用buildScan方法来获取数据源的RDD,所以也想看一下BaseFileOnlyRelation的buildScan方法是否也会在查询时用到以及什么情况下会用到,现在还不确定它和Spark DataSource V1的关系是啥,后面在学习总结Spark DataSource源码时,再来对比一下。