前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
在开始学习Hudi的时候,我们知道通过df.write.format("hudi").save
可以实现写Hudi,并且写Hudi的逻辑是在HoodieSparkSqlWriter.write
实现的,但是始终有一个疑问:它怎么从df.write.format("hudi").save
跳到HoodieSparkSqlWriter.write
中的呢?本文就是主要来回答这个问题的。
版本
Spark 2.4.4
Hudi 0.12.0-SNAPSHOT,和上篇文章Hudi Spark SQL源码学习总结-CTAS用的Hudi代码一样
示例代码
还是拿源码里的TestCreateTable
中的测试语句
1 | import spark.implicits._ |
df.write
返回DataFrameWriter
,后面的format
、option
、save
等方法都是在这个类中
1 | def write: DataFrameWriter[T] = { |
format
source
=”hudi”,后面save
时会用到1
2
3
4def format(source: String): DataFrameWriter[T] = {
this.source = source
this
}
save
save
方法首先添加path参数,然后判断source
是否等于hive,我们这里source
等于hudi,所以不满足,接下来通过DataSource.lookupDataSource
查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2
的子类,再执行后面的逻辑,所以我们需要先看一下DataSource.lookupDataSource
1 | def save(path: String): Unit = { |
DataSource.lookupDataSource
其实我们在上篇文章中讲isV2Provider
时涉及到Spark3.2.1版本的lookupDataSource
方法了,spark2.4.4的也差不多,我们再来看一下:其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource
org.apache.hudi.Spark2DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
等,然后过滤shortName=hudi的,只有Spark2DefaultSource
满足,所以直接返回Spark2DefaultSource
。这里和Spark3.2.1不同的是: Spark2对应Spark2DefaultSource
Spark3对应Spark3DefaultSource
1 | def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { |
Spark2DefaultSource
那么Spark2DefaultSource
是不是DataSourceV2
的子类呢,需要看一下它的定义1
2
3
4
5
6
7
8
9
10
11
12class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
}
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with SparkAdapterSupport
with Serializable {
可见Spark2DefaultSource
并不是DataSourceV2
的子类,所以接下来执行saveToV1Source
方法
saveToV1Source
saveToV1Source
方法的核心是后面的runCommand
,先看他的参数DataSource.planForWriting
1 | if (SparkSession.active.sessionState.conf.getConf( |
DataSource.planForWriting
根据上面的分析可知DataSource.lookupDataSource
返回的为Spark2DefaultSource
,并且它的父类DefaultSource
实现了CreatableRelationProvider
,所以返回SaveIntoDataSourceCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
// data为df.logicalPlan,dataSource为Spark2DefaultSource
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
// calssName=hudi
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
runCommand
1 | private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { |
我们在文章Hudi Spark SQL源码学习总结-Create Table中讲到过:withNewExecutionId
方法会调用方法体body
,这里的body
为qe.toRdd
1 | def withNewExecutionId[T]( |
qe.toRdd
1 | lazy val toRdd: RDD[InternalRow] = executedPlan.execute() |
这里的executedPlan
为ExecutedCommandExec
,因为SaveIntoDataSourceCommand
是RunnableCommand
也是Command
的子类,同样在我之前写的文章Hudi Spark SQL源码学习总结-Create Table我们可知无论是df.logicalPlan
还是executedPlan
都会触发一遍完整的Spark SQL的parsing
、analysis
、optimization
、planning
,并且在planning
阶段的planner.plan
方法中会遍历strategies
并应用其apply方法,其中有一个BasicOperators
,它的apply方法为1
2
3
4
5
6
7
8
9object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
val toRow = encoder.createSerializer()
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil
我们这里的plan为SaveIntoDataSourceCommand
是RunnableCommand
的子类,所以返回ExecutedCommandExec
,它是SparkPlan
的子类,至于如何触发Spark SQL的parsing
、analysis
、optimization
、planning
,本文就不再讲解了,只贴一下入口相关的代码
1 | /** Internal version of the RDD. Avoids copies and has no schema */ |
executedPlan.execute()
executedPlan.execute()方法是在SparkPlan
中定义的,它会调用doExecute
方法,SparkPlan
的doExecute
并没有具体实现,所以需要看一下它的具体实现类1
2
3
4
5
6final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
上面我们讲到,这里的SparkPlan
的子类是ExecutedCommandExec
,它的doExecute
会调用sideEffectResult
,继而调用cmd.run
,这里的cmd
为SaveIntoDataSourceCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
override def output: Seq[Attribute] = cmd.output
override def nodeName: String = "Execute " + cmd.nodeName
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
}
SaveIntoDataSourceCommand.run
它的run方法会调用dataSource.createRelation
,在上面构造SaveIntoDataSourceCommand
时我们知道dataSource
为Spark2DefaultSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20case class SaveIntoDataSourceCommand(
query: LogicalPlan,
dataSource: CreatableRelationProvider,
options: Map[String, String],
mode: SaveMode) extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
dataSource.createRelation(
sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
Seq.empty[Row]
}
override def simpleString: String = {
val redacted = SQLConf.get.redactOptions(options)
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
}
}
Spark2DefaultSource.createRelation
Spark2DefaultSource
的createRelation
方法是在其父类DefaultSource
中实现的,可见在createRelation
中调用了HoodieSparkSqlWriter.write
,到此为止,我们终于搞清楚了从df.write.format("hudi").save
到HoodieSparkSqlWriter.write
的逻辑。
1 | class Spark2DefaultSource extends DefaultSource with DataSourceRegister { |
总结
本文分析总结了从df.write.format("hudi").save
到HoodieSparkSqlWriter.write
的调用逻辑,解决了自己最开始学习Hudi时的一个疑惑😄,希望对大家也能有所帮助。