前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
由于工作原因,之前查询Hudi主要是用Hive来查询的,所以对Hive查询Hudi的逻辑比较了解,但是对于Spark查询Hudi的逻辑不太了解。所以现在想要学习一下Spark查询Hudi的大概逻辑,搞清楚它是如何从Spark的源码跳转到Hudi源码执行Hudi查询的逻辑, 这样既能搞清楚Spark查询表的逻辑,也能搞清楚Spark查询Hudi的逻辑,也便于在后面使用Kyuubi Spark SQL 时出现问题能更好的定位解决。
版本
Spark 2.4.4
Hudi master 0.12.0-SNAPSHOT 最新代码
(可以借助Spark3 planChangeLog 打印日志信息查看哪些规则生效)
示例代码
先用上篇文章写Hudi数据,再进行查询
1 | import spark.implicits._ |
spark.read
返回DataFrameReader
,后面的format
、load
方法都是在这个类中
1 | def read: DataFrameReader = new DataFrameReader(self) |
format
source
=”hudi”,后面load
时会用到1
2
3
4def format(source: String): DataFrameReader = {
this.source = source
this
}
load
save
方法首先添加path参数,然后判断source
是否等于hive,我们这里source
等于hudi,所以不满足,接下来通过DataSource.lookupDataSource
查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2
的子类,再执行后面的逻辑,我们在上篇文章讲过了DataSource.lookupDataSource
返回的是Spark2DefaultSource
且不是DataSourceV2
的子类,所以接下来执行loadV1Source
方法
1 | def load(path: String): DataFrame = { |
loadV1Source
loadV1Source
方法的核心是后面的sparkSession.baseRelationToDataFrame
,先看他的参数DataSource.resolveRelation
1 | private def loadV1Source(paths: String*) = { |
DataSource.resolveRelation
同样地这里的providingClass.newInstance()
为DataSource.lookupDataSource
返回的为Spark2DefaultSource
,它的父类DefaultSource
既实现了SchemaRelationProvider
也实现了RelationProvider
,但是这里的userSpecifiedSchema
为None,所以会匹配到第二个,所以relation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkFilesExist Whether to confirm that the files exist when generating the
* non-streaming file based datasource. StructuredStreaming jobs already
* list file existence, and when generating incremental jobs, the batch
* is considered as a non-streaming file based data source. Since we know
* that files already exist, we don't need to check them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (baseRelation.schema != schema) {
throw new AnalysisException(s"$className does not allow user-specified schemas.")
}
baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
......
relation
}
// calssName=hudi
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
dataSource.createRelation
这里的createRelation
是在Spark2DefaultSource
的父类DefaultSource
中实现的,首先根据传入的路径参数,获取表路径,然后创建HoodieTableMetaClient
,获取表类型等表配置,再根据表的配置信息判断返回什么类型的Relation
,这里返回resolveBaseFileOnlyRelation
(可见createRelation
是实现Hudi自己的查询逻辑的入口)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
val path = optParams.get("path")
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
if (path.isEmpty && readPathsStr.isEmpty) {
throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
}
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
} else {
Seq.empty
}
// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
// Get the table base path
// 获取表路径
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(fs, globPaths.toArray)
} else {
DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
}
log.info("Obtained hudi table path: " + tablePath)
// 创建HoodieTableMetaClient
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
// 是否为BootstrappedTable,本例为false
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
// 表类型,本例为COPY_ON_WRITE
val tableType = metaClient.getTableType
// 查询类型,本例为snapshot
val queryType = parameters(QUERY_TYPE.key)
// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
// from the table itself
val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
None
} else {
Option(schema)
}
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
// 判断有没有完成的commit,即判断是不是空表
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { // 如果是空表
// 返回EmptyRelation
new EmptyRelation(sqlContext, metaClient)
} else {
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
}
}
}
resolveBaseFileOnlyRelation
1 | private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, |
hasSchemaOnRead
是在BaseFileOnlyRelation
的父类HoodieBaseRelation
中定义的
1 | def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined |
因为SCHEMA_EVOLUTION_ENABLED
的默认值为false,所以internalSchemaOpt返回None,hasSchemaOnRead返回false,所以在resolveBaseFileOnlyRelation
返回调用baseRelation.toHadoopFsRelation
1
2
3
4
5
6val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");
baseRelation.toHadoopFsRelation
这个方法返回HadoopFsRelation
,其中location
为HoodieFileIndex
,是在其父类HoodieBaseRelation
中定义的,fileFomart
为Spark24HoodieParquetFileFormat
1 | override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = |
1 | protected lazy val fileIndex: HoodieFileIndex = |
HoodieFileIndex
继承了SparkHoodieTableFileIndex
,实现了Spark SQL源码里的FileIndex
的listFiles
等方法,这在查询Hudi表文件时有用,也正是实现查询Hudi表逻辑的地方,接下来让我们往下看它的listFiles
方法是如何被调用以及如何返回查询结果的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15case class HoodieFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
options: Map[String, String],
@transient fileStatusCache: FileStatusCache = NoopCache)
extends SparkHoodieTableFileIndex(
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
with FileIndex {
1 | lazy val (fileFormat: FileFormat, fileFormatClassName: String) = |
baseRelationToDataFrame
通过上面的分析我们知道了,resolveRelation
返回的是HadoopFsRelation(HoodieFileIndex,partitionSchema,dataSchema,None,Spark24HoodieParquetFileFormat,optParams)(sparkSession)
,接下来看一下baseRelationToDataFrame
1
2
3
4
5
6
7
8
9
10
11 def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
object LogicalRelation {
def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)
def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
}
Dataset.ofRows
在前面的几篇文章中已经分析过多次了,所以我们挑重点进行分析,其中参数logicalPlan
为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),relation.schema.toAttributes, None, false)
1
2
3
4
5def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
Dataset.show
我们知道对于查询上面的Dataset.ofRows
只会通过qe.assertAnalyzed()
触发analysis
阶段,通过分析日志我们发现并没有生效的规则,所以我们重点看一下后面的show
,它最后会调用到showString
,而showString
的核心逻辑在getRows
1 | def show(): Unit = show(20) |
getRows
getRows
的核心是在newDf.select(castCols: _*).take(numRows + 1)
,它会触发planning
1 | private[sql] def getRows( |
df.select
先看它的select
,其中select中的logicalPlan
为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)
,所以withPlan
的logicalPlan
为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))
1
2
3
4
5
6
7def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
}
private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}
take
接下来到了take
1
2def take(n: Int): Array[T] = head(n)
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
limit
先看参数limit
1 | def limit(n: Int): Dataset[T] = withTypedPlan { |
其中Literal(n)
的apply
方法返回Literal(21, IntegerType)
1 | object Limit { |
Limit的apply
返回GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...))))
,所以limit
返回Dataset(sparkSession, GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,…)))))
1 | private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { |
withAction
withAction
在前面的几篇文章也讲过了,其中executedPlan
都会触发一遍完整的Spark SQL的parsing
、analysis
、optimization
、planning
,并且最终会在withNewExecutionId
调用方法action
,具体为上面的collectFromPlan
,让我们先看看有哪些重要的规则或者策略生效了
1 | private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { |
plan 入口
这里提一下plan入口,之前没有注意到,这里是把optimizedPlan
用ReturnAnswer
包装了一下(因为后面分析会用到)
1 | lazy val sparkPlan: SparkPlan = { |
根据ReturnAnswer
注释可知,对于take
或collect
等操作,仅会在逻辑查询计划的顶部进行规则转化
1 | /** |
strategies
先看一下有哪些策略
1 | 0 = {SparkStrategies$PythonEvals$@13376} |
planner.plan
再看一下plan方法,之前对它的理解仅限于他会遍历策略调用aplly,现在对它的理解稍微多了一点,它首先遍历一遍策略返回candidates
,再遍历candidates
,调用collectPlaceholders
拆开被PlanLater
包装的子计划返回placeholders
,如果placeholders
非空,则再递归调用plan
方法应用其子节点
1 | def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { |
SpecialLimits
第一遍遍历因为有ReturnAnswer
包装所以在SpecialLimits
匹配成功
1 | /** |
匹配Limit
时,会调用它的unapply
,它会匹配到第三个,返回CollectLimitExec(21,PlanLater(Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))
1
2
3
4
5
6def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
p match {
case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
case _ => None
}
}
其中PlanLater
只是把plan
包装了一下,在planner.plan
方法中会通过collectPlaceholders
拆开
1 | override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { |
FileSourceStrategy
拆开PlanLater
,为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...))
,在FileSourceStrategy
匹配时调用PhysicalOperation
的unapply
方法,所以这里会在第一个匹配成功,返回ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))
,因为这里为子节点,所以加上父节点,最后返回的是CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
1 | object PhysicalOperation extends PredicateHelper { |
1 | def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { |
prepareForExecution
上面返回的sparkPlan
为CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
1
2
3
4
5
6
7
8
9
10
11
12
13lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
CollapseCodegenStages
1 | def apply(plan: SparkPlan): SparkPlan = { |
首先CollectLimitExec
匹配到other
,所以递归调用children
,ProjectExec
实现了CodegenSupport
,返回WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
,因为ProjectExec
是子节点,所以最终返回CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))))
1 | private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match { |
1 | case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) |
collectFromPlan
1 | private def collectFromPlan(plan: SparkPlan): Array[T] = { |
executeCollect
这里的plan为CollectLimitExec
,调用child.executeTake
,这里child为WholeStageCodegenExec
,它没有重写executeTake
,所以调用父类SparkPlan
的executeTake
1 | case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { |
executeTake
核心getByteArrayRdd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50/**
* Runs this query returning the first `n` rows as an array.
*
* This is modeled after `RDD.take` but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
return new Array[InternalRow](0)
}
val childRDD = getByteArrayRdd(n).map(_._2)
val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
while (buf.size < n && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
if (buf.isEmpty) {
numPartsToTry = partsScanned * limitScaleUpFactor
} else {
val left = n - buf.size
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD,
(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
buf ++= res.flatMap(decodeUnsafeRows)
partsScanned += p.size
}
if (buf.size > n) {
buf.take(n).toArray
} else {
buf.toArray
}
}
getByteArrayRdd
1 | private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = { |
doExecute
WholeStageCodegenExec.doExecute
,核心:child.asInstanceOf[CodegenSupport].inputRDDs()
,这里的child为ProjectExec
1 | override def doExecute(): RDD[InternalRow] = { |
inputRDDs
ProjectExec.inputRDDs
, child
为FileSourceScanExec
1
2
3override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
FileSourceScanExec.inputRDD
- relation: HadoopFsRelation
- relation.fileFormat: Spark24HoodieParquetFileFormat
- relation.bucketSpec: None
1 | private lazy val inputRDD: RDD[InternalRow] = { |
createNonBucketedReadRDD
1 | /** |
selectedPartitions
- relation.location: HoodieFileIndex
1 | private lazy val selectedPartitions: Array[PartitionDirectory] = { |
HoodieFileIndex.listFiles
涉及方法:lookupCandidateFilesInMetadataTable
、refresh
、doRefresh
、loadPartitionPathFiles
等,其中在初始化new时会调用refresh
、doRefresh
1 | /** |
在doRefresh
初始化的cachedAllInputFileSlices
实现了获取Hudi文件列表的逻辑
1 | cachedAllInputFileSlices = partitionFiles.keySet().stream() |
Spark24HoodieParquetFileFormat
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns 大家可以自己研究一下
总结
这样基本上理清了spark.read.format(“hudi”).load 从Spark源码到Hudi源码的逻辑,限于能力和精力有些地方还没有完全搞清楚,不过只要搞懂整体流程,细节上在有需要时再慢慢研究。