前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
 
        前言
由于工作原因,之前查询Hudi主要是用Hive来查询的,所以对Hive查询Hudi的逻辑比较了解,但是对于Spark查询Hudi的逻辑不太了解。所以现在想要学习一下Spark查询Hudi的大概逻辑,搞清楚它是如何从Spark的源码跳转到Hudi源码执行Hudi查询的逻辑, 这样既能搞清楚Spark查询表的逻辑,也能搞清楚Spark查询Hudi的逻辑,也便于在后面使用Kyuubi Spark SQL 时出现问题能更好的定位解决。
版本
Spark 2.4.4
Hudi master 0.12.0-SNAPSHOT 最新代码
(可以借助Spark3 planChangeLog 打印日志信息查看哪些规则生效)
示例代码
先用上篇文章写Hudi数据,再进行查询
| 1 | import spark.implicits._ | 
spark.read
返回DataFrameReader,后面的format、load方法都是在这个类中
| 1 | def read: DataFrameReader = new DataFrameReader(self) | 
format
source=”hudi”,后面load时会用到1
2
3
4def format(source: String): DataFrameReader = {
  this.source = source
  this
}
load
save方法首先添加path参数,然后判断source是否等于hive,我们这里source等于hudi,所以不满足,接下来通过DataSource.lookupDataSource查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2的子类,再执行后面的逻辑,我们在上篇文章讲过了DataSource.lookupDataSource返回的是Spark2DefaultSource且不是DataSourceV2的子类,所以接下来执行loadV1Source方法
| 1 | def load(path: String): DataFrame = { | 
loadV1Source
loadV1Source方法的核心是后面的sparkSession.baseRelationToDataFrame,先看他的参数DataSource.resolveRelation
| 1 | private def loadV1Source(paths: String*) = { | 
DataSource.resolveRelation
同样地这里的providingClass.newInstance()为DataSource.lookupDataSource返回的为Spark2DefaultSource,它的父类DefaultSource既实现了SchemaRelationProvider也实现了RelationProvider,但是这里的userSpecifiedSchema为None,所以会匹配到第二个,所以relation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41/**
 * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
 * [[DataSource]]
 *
 * @param checkFilesExist Whether to confirm that the files exist when generating the
 *                        non-streaming file based datasource. StructuredStreaming jobs already
 *                        list file existence, and when generating incremental jobs, the batch
 *                        is considered as a non-streaming file based data source. Since we know
 *                        that files already exist, we don't need to check them again.
 */
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
  val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
    // TODO: Throw when too much is given.
    case (dataSource: SchemaRelationProvider, Some(schema)) =>
      dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
    case (dataSource: RelationProvider, None) =>
      dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
    case (_: SchemaRelationProvider, None) =>
      throw new AnalysisException(s"A schema needs to be specified when using $className.")
    case (dataSource: RelationProvider, Some(schema)) =>
      val baseRelation =
        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
      if (baseRelation.schema != schema) {
        throw new AnalysisException(s"$className does not allow user-specified schemas.")
      }
      baseRelation
    // We are reading from the results of a streaming query. Load files from the metadata log
    // instead of listing them using HDFS APIs.
    case (format: FileFormat, _)
        if FileStreamSink.hasMetadata(
          caseInsensitiveOptions.get("path").toSeq ++ paths,
          sparkSession.sessionState.newHadoopConf()) =>
      ......
  relation
}
// calssName=hudi
lazy val providingClass: Class[_] =
  DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
dataSource.createRelation
这里的createRelation是在Spark2DefaultSource的父类DefaultSource中实现的,首先根据传入的路径参数,获取表路径,然后创建HoodieTableMetaClient,获取表类型等表配置,再根据表的配置信息判断返回什么类型的Relation,这里返回resolveBaseFileOnlyRelation
(可见createRelation是实现Hudi自己的查询逻辑的入口)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93override def createRelation(sqlContext: SQLContext,
                            parameters: Map[String, String]): BaseRelation = {
  createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
                            optParams: Map[String, String],
                            schema: StructType): BaseRelation = {
  val path = optParams.get("path")
  val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
  if (path.isEmpty && readPathsStr.isEmpty) {
    throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
  }
  val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
  val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
  val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
  val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
    HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
  } else {
    Seq.empty
  }
  // Add default options for unspecified read options keys.
  val parameters = (if (globPaths.nonEmpty) {
    Map(
      "glob.paths" -> globPaths.mkString(",")
    )
  } else {
    Map()
  }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
  // Get the table base path
  // 获取表路径
  val tablePath = if (globPaths.nonEmpty) {
    DataSourceUtils.getTablePath(fs, globPaths.toArray)
  } else {
    DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
  }
  log.info("Obtained hudi table path: " + tablePath)
  // 创建HoodieTableMetaClient
  val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
  // 是否为BootstrappedTable,本例为false
  val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
  // 表类型,本例为COPY_ON_WRITE
  val tableType = metaClient.getTableType
  // 查询类型,本例为snapshot
  val queryType = parameters(QUERY_TYPE.key)
  // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
  //       Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
  //       case  we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
  //       from the table itself
  val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
    None
  } else {
    Option(schema)
  }
  log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
  // 判断有没有完成的commit,即判断是不是空表
  if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { // 如果是空表
    // 返回EmptyRelation
    new EmptyRelation(sqlContext, metaClient)
  } else {
    (tableType, queryType, isBootstrappedTable) match {
      case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
           (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
           (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
        resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
      case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
        new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
        new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
      case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
        new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
      case (_, _, true) =>
        new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
      case (_, _, _) =>
        throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
          s"isBootstrappedTable: $isBootstrappedTable ")
    }
  }
}
resolveBaseFileOnlyRelation
| 1 | private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, | 
hasSchemaOnRead是在BaseFileOnlyRelation的父类HoodieBaseRelation中定义的
| 1 | def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined | 
因为SCHEMA_EVOLUTION_ENABLED的默认值为false,所以internalSchemaOpt返回None,hasSchemaOnRead返回false,所以在resolveBaseFileOnlyRelation返回调用baseRelation.toHadoopFsRelation1
2
3
4
5
6val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
  public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
      .key("hoodie.schema.on.read.enable")
      .defaultValue(false)
      .withDocumentation("Enables support for Schema Evolution feature");
baseRelation.toHadoopFsRelation
这个方法返回HadoopFsRelation,其中location为HoodieFileIndex,是在其父类HoodieBaseRelation中定义的,fileFomart为Spark24HoodieParquetFileFormat
| 1 | override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = | 
| 1 | protected lazy val fileIndex: HoodieFileIndex = | 
HoodieFileIndex继承了SparkHoodieTableFileIndex,实现了Spark SQL源码里的FileIndex的listFiles等方法,这在查询Hudi表文件时有用,也正是实现查询Hudi表逻辑的地方,接下来让我们往下看它的listFiles方法是如何被调用以及如何返回查询结果的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15case class HoodieFileIndex(spark: SparkSession,
                           metaClient: HoodieTableMetaClient,
                           schemaSpec: Option[StructType],
                           options: Map[String, String],
                           @transient fileStatusCache: FileStatusCache = NoopCache)
  extends SparkHoodieTableFileIndex(
    spark = spark,
    metaClient = metaClient,
    schemaSpec = schemaSpec,
    configProperties = getConfigProperties(spark, options),
    queryPaths = HoodieFileIndex.getQueryPaths(options),
    specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
    fileStatusCache = fileStatusCache
  )
    with FileIndex {
| 1 | lazy val (fileFormat: FileFormat, fileFormatClassName: String) = | 
baseRelationToDataFrame
通过上面的分析我们知道了,resolveRelation返回的是HadoopFsRelation(HoodieFileIndex,partitionSchema,dataSchema,None,Spark24HoodieParquetFileFormat,optParams)(sparkSession),接下来看一下baseRelationToDataFrame1
2
3
4
5
6
7
8
9
10
11  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
    Dataset.ofRows(self, LogicalRelation(baseRelation))
  }
object LogicalRelation {
  def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
    LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)
  def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
    LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
}
Dataset.ofRows在前面的几篇文章中已经分析过多次了,所以我们挑重点进行分析,其中参数logicalPlan为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),relation.schema.toAttributes, None, false)1
2
3
4
5def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
  val qe = sparkSession.sessionState.executePlan(logicalPlan)
  qe.assertAnalyzed()
  new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
Dataset.show
我们知道对于查询上面的Dataset.ofRows只会通过qe.assertAnalyzed()触发analysis阶段,通过分析日志我们发现并没有生效的规则,所以我们重点看一下后面的show,它最后会调用到showString,而showString的核心逻辑在getRows
| 1 | def show(): Unit = show(20) | 
getRows
getRows的核心是在newDf.select(castCols: _*).take(numRows + 1),它会触发planning
| 1 | private[sql] def getRows( | 
df.select
先看它的select,其中select中的logicalPlan为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),所以withPlan的logicalPlan为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))1
2
3
4
5
6
7def select(cols: Column*): DataFrame = withPlan {
  Project(cols.map(_.named), logicalPlan)
}
 private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
  Dataset.ofRows(sparkSession, logicalPlan)
}
take
接下来到了take1
2def take(n: Int): Array[T] = head(n)
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
limit
先看参数limit
| 1 | def limit(n: Int): Dataset[T] = withTypedPlan { | 
其中Literal(n)的apply方法返回Literal(21, IntegerType)
| 1 | object Limit { | 
Limit的apply返回GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))),所以limit返回Dataset(sparkSession, GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,…)))))
| 1 | private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { | 
withAction
withAction在前面的几篇文章也讲过了,其中executedPlan都会触发一遍完整的Spark SQL的parsing、analysis、optimization 、planning,并且最终会在withNewExecutionId调用方法action,具体为上面的collectFromPlan,让我们先看看有哪些重要的规则或者策略生效了
| 1 | private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { | 
plan 入口
这里提一下plan入口,之前没有注意到,这里是把optimizedPlan用ReturnAnswer包装了一下(因为后面分析会用到)
| 1 | lazy val sparkPlan: SparkPlan = { | 
根据ReturnAnswer注释可知,对于take或collect等操作,仅会在逻辑查询计划的顶部进行规则转化
| 1 | /** | 
strategies
先看一下有哪些策略
| 1 | 0 = {SparkStrategies$PythonEvals$@13376} | 
planner.plan
再看一下plan方法,之前对它的理解仅限于他会遍历策略调用aplly,现在对它的理解稍微多了一点,它首先遍历一遍策略返回candidates,再遍历candidates,调用collectPlaceholders拆开被PlanLater包装的子计划返回placeholders,如果placeholders非空,则再递归调用plan方法应用其子节点
| 1 | def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { | 
SpecialLimits
第一遍遍历因为有ReturnAnswer包装所以在SpecialLimits匹配成功
| 1 | /** | 
匹配Limit时,会调用它的unapply,它会匹配到第三个,返回CollectLimitExec(21,PlanLater(Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))1
2
3
4
5
6def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
  p match {
    case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
    case _ => None
  }
}
其中PlanLater只是把plan包装了一下,在planner.plan方法中会通过collectPlaceholders拆开
| 1 | override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { | 
FileSourceStrategy
拆开PlanLater,为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)),在FileSourceStrategy匹配时调用PhysicalOperation的unapply方法,所以这里会在第一个匹配成功,返回ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))),因为这里为子节点,所以加上父节点,最后返回的是CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
| 1 | object PhysicalOperation extends PredicateHelper { | 
| 1 | def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | 
prepareForExecution
上面返回的sparkPlan为CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))1
2
3
4
5
6
7
8
9
10
11
12
13lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
  PlanSubqueries(sparkSession),
  EnsureRequirements(sparkSession.sessionState.conf),
  CollapseCodegenStages(sparkSession.sessionState.conf),
  ReuseExchange(sparkSession.sessionState.conf),
  ReuseSubquery(sparkSession.sessionState.conf))
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
  preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
CollapseCodegenStages
| 1 | def apply(plan: SparkPlan): SparkPlan = { | 
首先CollectLimitExec匹配到other,所以递归调用children,ProjectExec实现了CodegenSupport,返回WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))),因为ProjectExec是子节点,所以最终返回CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))))
| 1 | private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match { | 
| 1 | case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) | 
collectFromPlan
| 1 | private def collectFromPlan(plan: SparkPlan): Array[T] = { | 
executeCollect
这里的plan为CollectLimitExec,调用child.executeTake,这里child为WholeStageCodegenExec,它没有重写executeTake,所以调用父类SparkPlan的executeTake
| 1 | case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { | 
executeTake
核心getByteArrayRdd1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50/**
 * Runs this query returning the first `n` rows as an array.
 *
 * This is modeled after `RDD.take` but never runs any job locally on the driver.
 */
def executeTake(n: Int): Array[InternalRow] = {
  if (n == 0) {
    return new Array[InternalRow](0)
  }
  val childRDD = getByteArrayRdd(n).map(_._2)
  val buf = new ArrayBuffer[InternalRow]
  val totalParts = childRDD.partitions.length
  var partsScanned = 0
  while (buf.size < n && partsScanned < totalParts) {
    // The number of partitions to try in this iteration. It is ok for this number to be
    // greater than totalParts because we actually cap it at totalParts in runJob.
    var numPartsToTry = 1L
    if (partsScanned > 0) {
      // If we didn't find any rows after the previous iteration, quadruple and retry.
      // Otherwise, interpolate the number of partitions we need to try, but overestimate
      // it by 50%. We also cap the estimation in the end.
      val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
      if (buf.isEmpty) {
        numPartsToTry = partsScanned * limitScaleUpFactor
      } else {
        val left = n - buf.size
        // As left > 0, numPartsToTry is always >= 1
        numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
        numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
      }
    }
    val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
    val sc = sqlContext.sparkContext
    val res = sc.runJob(childRDD,
      (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
    buf ++= res.flatMap(decodeUnsafeRows)
    partsScanned += p.size
  }
  if (buf.size > n) {
    buf.take(n).toArray
  } else {
    buf.toArray
  }
}
getByteArrayRdd
| 1 | private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = { | 
doExecute
WholeStageCodegenExec.doExecute,核心:child.asInstanceOf[CodegenSupport].inputRDDs(),这里的child为ProjectExec
| 1 | override def doExecute(): RDD[InternalRow] = { | 
inputRDDs
ProjectExec.inputRDDs, child为FileSourceScanExec1
2
3override def inputRDDs(): Seq[RDD[InternalRow]] = {
  child.asInstanceOf[CodegenSupport].inputRDDs()
}
FileSourceScanExec.inputRDD
- relation: HadoopFsRelation
- relation.fileFormat: Spark24HoodieParquetFileFormat
- relation.bucketSpec: None
| 1 | private lazy val inputRDD: RDD[InternalRow] = { | 
createNonBucketedReadRDD
| 1 | /** | 
selectedPartitions
- relation.location: HoodieFileIndex
| 1 | private lazy val selectedPartitions: Array[PartitionDirectory] = { | 
HoodieFileIndex.listFiles
涉及方法:lookupCandidateFilesInMetadataTable、refresh、doRefresh、loadPartitionPathFiles等,其中在初始化new时会调用refresh 、doRefresh
| 1 | /** | 
在doRefresh初始化的cachedAllInputFileSlices实现了获取Hudi文件列表的逻辑
| 1 | cachedAllInputFileSlices = partitionFiles.keySet().stream() | 
Spark24HoodieParquetFileFormat
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns 大家可以自己研究一下
总结
这样基本上理清了spark.read.format(“hudi”).load 从Spark源码到Hudi源码的逻辑,限于能力和精力有些地方还没有完全搞清楚,不过只要搞懂整体流程,细节上在有需要时再慢慢研究。
 
		 
                      