前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
补充上一篇文章Hudi Spark源码学习总结-spark.read.format(“hudi”).load,由于上篇文章篇幅已经比较长了,所以单独写一篇补充一下,没有读过的可以先阅读一下,我们在上篇文章讲到resolveBaseFileOnlyRelation
返回的是HadoopFsRelation
,那么如果返回BaseFileOnlyRelation
呢?
起因
其实除了HadoopFsRelation
、BaseFileOnlyRelation
还有IncrementalRelation
、MergeOnReadSnapshotRelation
、MergeOnReadIncrementalRelation
、HoodieBootstrapRelation
,之所有要单独看一下BaseFileOnlyRelation
,是因为我们从提交历史上可以看出,一开始默认的就是HadoopFsRelation
后来添加了自定义的BaseFileOnlyRelation
,然后现在回退成了HadoopFsRelation
,查看对应PR了解了一下原因
添加BaseFileOnlyRelation
的PR:[HUDI-3338] custom relation instead of HadoopFsRelation,原因是:目前,HadoopFsRelation
将使用实际分区路径的值作为分区字段的值。但是,与普通表不同,Hudi在Parquet文件中保留分区值。在某些情况下,实际分区路径的值和分区字段的值是不同的。因此,这里实现了BaseFileOnlyViewRelation
,通过它,Hudi可以管理自己的关系。
回退为HadoopFsRelation
的PR:[HUDI-3902] Fallback to HadoopFsRelation in cases non-involving Schema Evolution,原因是:Spark的一些优化规则(以及一些其他处理)是基于HadoopFsRelation
的使用,这导致当我们依赖自定义Relation
时,这些优化没有得到应用。我们可以在[HUDI-3338]的PR的下面可以看到,有commiter提出,使用BaseFileOnlyRelation
后查询性能相比于之前的HadoopFsRelation
下降了大约一倍,这是因为Spark仅在少数地方做了查询优化,其中一个地方就是我们在上篇文章中讲到的FileSourceStrategy
,我们看到[HUDI-3902]虽然回退到HadoopFsRelation
,但是貌似并没有处理分区路径值的问题,所以后面又提了一个PR:
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns,这个PR添加了自定义FileFormat
:Spark24HoodieParquetFileFormat
,它重写了ParquetFileFormat
的buildReaderWithPartitionValues
方法。其中的关键点是添加了参数shouldAppendPartitionValues
来判断是否将分区路径中的值添加为分区字段的值。
1 | if (shouldAppendPartitionValues) { |
BaseFileOnlyRelation
那么接下来看一下BaseFileOnlyRelation
后面的处理逻辑吧,为了方便调试,我们直接把resolveBaseFileOnlyRelation
的返回值改为BaseFileOnlyRelation
qe.executedPlan
我们先看一下qe.executedPlan
的返回值是啥吧,对比一下和HadoopFsRelation
不同,这样我们只需要分析不同点就可以了
1 | CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,RowDataSourceScanExec()))) |
其中RowDataSourceScanExec
的Relation
为BaseFileOnlyRelation
,rdd为Spark2HoodieFileScanRDD
,这两个都是和Hudi相关,我们对比HadoopFsRelation
发现由之前的FileSourceScanExec
改成了RowDataSourceScanExec
,除了这一点,其他的都一样,那么我们需要是分析一下是在哪里返回RowDataSourceScanExec
的。
DataSourceStrategy
它会在DataSourceStrategy
匹配到第二个,因为BaseFileOnlyRelation
是PrunedFilteredScan
的子类,可以看一下它的定义
1 | class BaseFileOnlyRelation(sqlContext: SQLContext, |
1 | case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with CastSupport { |
pruneFilterProject
1 | private def pruneFilterProject( |
pruneFilterProjectRaw
1 | private def pruneFilterProjectRaw( |
在这方法里我们看到它会构造RowDataSourceScanExec
并返回,其中它的relation参数为BaseFileOnlyRelation
,而它的rdd
是通过调用HoodieBaseRelation
重写buildScan
方法返回的RDD[Row]
,值为Spark2HoodieFileScanRDD
,这正是查询Hudi表数据的核心逻辑
buildScan
1 | override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { |
collectFromPlan
我们在上篇文章中讲过这个方法最终会调用RowDataSourceScanExec
的inputRDDs
RowDataSourceScanExec.inputRDDs
1 | override def inputRDDs(): Seq[RDD[InternalRow]] = { |
直接返回rdd
,rdd
是通过HoodieBaseRelation.buildScan
生成的Spark2HoodieFileScanRDD
总结
本文总结了使用HadoopFsRelation
和BaseFileOnlyRelation
的原因以及使用BaseFileOnlyRelation
时查询Hudi的逻辑,知道了在使用BaseFileOnlyRelation
是通过buildScan
实现查询的。
我在文章https://shzhangji.com/cnblogs/2018/12/09/spark-datasource-api-v2/学习
Spark DataSource
时,了解到Spark DataSource V1
是通过调用buildScan
方法来获取数据源的RDD
,所以也想看一下BaseFileOnlyRelation
的buildScan
方法是否也会在查询时用到以及什么情况下会用到,现在还不确定它和Spark DataSource V1
的关系是啥,后面在学习总结Spark DataSource
源码时,再来对比一下。