前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
简要总结Hudi Spark Sql源码执行逻辑,从建表开始。其实从去年开始接触Hudi的时候就研究学习了Hudi Spark SQL的部分源码,并贡献了几个PR,但是完整的逻辑有些地方还没有完全梳理清楚,所以现在想要从头开始学习,搞懂一些知识难点,这样以后看相关源码的时候就不会导致因为一些关键点不懂影响进度。
由于本人能力和精力有限,本文只讲解自己觉得比较关键的点,主要目的是梳理整个流程。
Spark SQL源码
既然是学习Hudi Spark SQL源码,那么肯定离不开Spark SQL源码,所以需要先学习了解Spark SQL的源码,在CSDN上发现一位作者写的几篇文章不错,这几天我也主要是参考他写的这几篇文章并结合源码进行学习的,我把它们放在后面的参考文章中,大家可以参考一下。
版本
Spark 2.4.4
Hudi master分支 0.12.0-SNAPSHOT
虽然在学习Spark SQL源码的时候用的是Spark3.3,但是因为Hudi源码默认的Spark版本是2.4.4,如果改版本在IDEA调试的话比较麻烦,所以是用Spark2.4.4版本,但我和Spark3.3对比了一下,大致逻辑是一样的。
示例代码
直接拿源码里的TestCreateTable
的第一个测试语句
1 | spark.sql( |
打印计划
我们先看一下上面的代码的执行计划1
2val df = spark.sql(createTableSql)
df.explain(true)
打印结果
1 | == Parsed Logical Plan == |
分别对应阶段parsing
、analysis
、optimization
、planning
扩展 HoodieSparkSessionExtension
要想使Spark SQL支持Hudi,必须在启动spark的时候扩展 HoodieSparkSessionExtension
,先放在这里,后面分析的时候会用到
1 | package org.apache.spark.sql.hudi |
ANTLR
学习Spark SQL的时候我们知道SQL的识别、编译、解释是由ANTLR实现的,所以我们需要看一下Hudi源码里的.g4
文件,一共有三个
hudi-spark模块下的
HoodieSqlCommon.g4
hudi-spark2模块下的
SqlBase.g4
,这个其实是拷贝的Spark2.4.5源码里的SqlBase.g4
hudi-spark2模块下的
HoodieSqlBase.g4
其中导入了上面的SqlBase.g4
:import SqlBase
入口
spark.sql
1 | def sql(sqlText: String): DataFrame = { |
sessionState.sqlParser
首先看一下sessionState是由哪个类实现的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
state
}
}
private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
}
}
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
"org.apache.spark.sql.hive.HiveSessionStateBuilder"
def enableHiveSupport(): Builder = synchronized {
if (hiveClassesArePresent) {
config(CATALOG_IMPLEMENTATION.key, "hive")
} else {
throw new IllegalArgumentException(
"Unable to instantiate SparkSession with Hive support because " +
"Hive classes are not found.")
}
}
可以看到在不启用Hive的情况下,使用的是SessionStateBuilder
,启用了Hive使用HiveSessionStateBuilder
,我们本地测试代码默认不启用Hive,那么看一下SessionStateBuilder
1 | class SessionStateBuilder( |
SessionStateBuilder
继承自BaseSessionStateBuilder
,也就是sessionState
是通过BaseSessionStateBuilder.build
创建的,其中定义了sqlParser
1
2
3protected lazy val sqlParser: ParserInterface = {
extensions.buildParser(session, new SparkSqlParser(conf))
}
extensions 是在SparkSession.Builder
中定义的:1
private[this] val extensions = new SparkSessionExtensions
SparkSessionExtensions
1 | type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface |
默认情况下parserBuilders
为空,那么foldLeft返回的值为默认的SparkSqlParser
,一开始没搞懂parserBuilders
为空的情况下为啥会返回SparkSqlParser
,直到看了foldLeft
的源码:
1 | def foldLeft[B](z: B)(op: (B, A) => B): B = { |
首先将参数z
赋值为result
,然后再foreach
利用op
函数改变result
的值,因为这里parserBuilders
为空,所以没有执行foreach里面的逻辑,那么返回结果result
为参数z
,也就是这里的new SparkSqlParser(conf)
Hudi自定义sqlParser
搞懂了Spark SQL默认的sqlParser
为SparkSqlParser
,那么Hudi是一样的吗?那我们就需要看一下开始的HoodieSparkSessionExtension
,我们在程序里是这样扩展的.withExtensions(new HoodieSparkSessionExtension)
1 | def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized { |
这里的f为HoodieSparkSessionExtension
(extends (SparkSessionExtensions => Unit),继承Function1 ),也就是一开始withExtensions
时会调用HoodieSparkSessionExtension
的apply方法,其中会执行
1 | extensions.injectParser { (session, parser) => |
这里的injectParser会将HoodieCommonSqlParser添加到parserBuilders中1
2
3def injectParser(builder: ParserBuilder): Unit = {
parserBuilders += builder
}
再根据上面对buildParser
的理解,那么在Hudi Spark SQL中sqlParser
为HoodieCommonSqlParser
parsing
parsePlan
接下来看parsePlan
方法,这个方法对应parsing阶段
,这里实际为HoodieCommonSqlParser.parsePlan
1
2
3
4
5
6
7
8
9
10private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate)
private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
.map(_(session, delegate)).getOrElse(delegate)
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _=> sparkExtendedParser.parsePlan(sqlText)
}
}
先看parse
方法
1 | protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = { |
根据上面的代码可知函数体中的parser
为HoodieSqlCommonParser
,这里的HoodieSqlCommonParser
为HoodieSqlCommon.g4
编译生成的,根据在参考文章中学习Spark SQL源码时可知parser.singleStatement()
的核心代码是调用方法statement
,而statement
根据我的理解对应g4文件中的statement
,根据g4文件中定义的语句匹配我们传入的sql语句,如果匹配上返回对应的LogicalPlan
,匹配不成功则返回null1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 public final SingleStatementContext singleStatement() throws RecognitionException {
SingleStatementContext _localctx = new SingleStatementContext(_ctx, getState());
enterRule(_localctx, 0, RULE_singleStatement);
int _la;
try {
enterOuterAlt(_localctx, 1);
{
setState(40);
statement();
......
public final StatementContext statement() throws RecognitionException {
StatementContext _localctx = new StatementContext(_ctx, getState());
enterRule(_localctx, 2, RULE_statement);
int _la;
try {
int _alt;
setState(124);
_errHandler.sync(this);
switch ( getInterpreter().adaptivePredict(_input,12,_ctx) ) {
case 1:
....
而HoodieSqlCommon.g4
文件中的statement
没有对应create table
相关的语法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16singleStatement
: statement ';'* EOF
;
statement
: compactionStatement #compactionCommand
| CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
tableIdentifier (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS indexOptions=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex
| SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes
| REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex
| .*? #passThrough
;
可以看出HoodieSqlCommonParser
的singleStatement
的实现也是和g4文件中的定义一致
所以在这里parser.singleStatement()
返回null,那么builder.visit(parser.singleStatement())
模式匹配会走到sparkExtendedParser.parsePlan(sqlText)
sparkExtendedParser
1 | private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser |
Spark2Adapter.createExtendedSparkParser
1 | override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { |
HoodieSpark2ExtendedSqlParser.parsePlan
1
2
3
4
5
6override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _=> delegate.parsePlan(sqlText)
}
}
这里的parsePlan
和上面的差不多,还是要先看parse
方法
1 |
|
这里的parser为HoodieSqlBaseParser
,对应HoodieSqlBase.g4
1 | singleStatement |
同样的逻辑这里的statement
也没有create table
,同样返回null,调用delegate.parsePlan(sqlText)
,这里的delegate
由最开始的extensions.buildParser
传进来的1
extensions.buildParser(session, new SparkSqlParser(conf))
所以这里的delegate
为SparkSqlParser
SparkSqlParser
SparkSqlParser
为Spark的源码,继承了抽象类AbstractSqlParser
,本身并没有实现parsePlan
方法,所以需要看一下父类AbstractSqlParser
的parsePlan
方法
1 | override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => |
同样的先看parse
方法
1 | protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = { |
这里的parser
为SqlBaseParser
,它是Spark源码里的,同样的Spark源码里也有一个SqlBase.g4
,其实Hudi的SqlBase.g4
文件是拷贝自Spark源码里的,对比两个文件的内容和从Hudi的SqlBase.g4
里的一行注释就可以知晓:
1 | // The parser file is forked from spark 2.4.5's SqlBase.g4. |
我们在SqlBase.g4
里查看singleStatement
和statement
的定义
1 | singleStatement |
statement
内容比较多,只截取其中一部分,可以看到statement
有两个和sql create table
相关的,一个是#createTable
另一个是#createHiveTable
,他俩的区别为#createTable
多了一个tableProvider
,tableProvider
包含 关键字USING
,我们的sql里有using hudi
,所以这里的statement
应该匹配为#createTable
,具体看一下它的部分源码
singleStatement
和上面的差不多,核心逻辑还是statement()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public final SingleStatementContext singleStatement() throws RecognitionException {
SingleStatementContext _localctx = new SingleStatementContext(_ctx, getState());
enterRule(_localctx, 0, RULE_singleStatement);
try {
enterOuterAlt(_localctx, 1);
{
setState(202);
statement();
setState(203);
match(EOF);
}
}
catch (RecognitionException re) {
_localctx.exception = re;
_errHandler.reportError(this, re);
_errHandler.recover(this, re);
}
finally {
exitRule();
}
return _localctx;
}
statement
1 | public final StatementContext statement() throws RecognitionException { |
这里应该匹配到 6 :CreateTableContext
,然后去匹配g4
文件中定义的 createTableHeader
、tableProvider
、PARTITIONED
、TBLPROPERTIES
等,主要是匹配sql关键字解析语法
解释一下为啥会匹配到
6 :CreateTableContext
,我没有专门去学习ANTLR的原理,根据我的理解,statement
定义了很多语法,其中#createTable
是第6个,所以会匹配到6,至于CreateTableContext
,是将#createTable
首字母大写并在后面加个Context,里面的其他内容也是和g4文件中的定义一一对应的,可以看一下其他的匹配都是这个规律
parser.singleStatement()
其实返回的是SingleStatementContext
,接下来看一下visitSingleStatement
,先看一下参数ctx.statement
1 | override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { |
根据上面的分析和我在代码中的注释我们知道这里的ctx.statement
返回的是CreateTableContext
,再看一下visit
1 | public T visit(ParseTree tree) { |
这里的tree为CreateTableContext
,所以实际调用CreateTableContext
的accept
方法
1 | public static class CreateTableContext extends StatementContext { |
accept
又调用visitCreateTable
,这里的visitor为前面讲到的SparkSqlParser
里面的SparkSqlAstBuilder
,它是AstBuilder
的子类,AstBuilder
是SqlBaseBaseVisitor
的子类1
2class SparkSqlAstBuilder extends AstBuilder
class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logging
1 | override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { |
具体细节就不多做解释,可以自己看,主要的点一个是provider
为hudi
这个后面会用到,它是在前面提到的tableProvider
得到的,返回结果为CreateTable
,它是LogicalPlan
的子类
1 | case class CreateTable( |
这样parsePlan
最终返回的结果为 CreateTable
,parsing
阶段完成,接下来到了analysis
阶段
analysis
1 | def sql(sqlText: String): DataFrame = { |
analysis阶段的入口在qe.assertAnalyzed()
1 | def assertAnalyzed(): Unit = analyzed |
analyzer
首先看一下sparkSession.sessionState.analyzer
1 | lazy val analyzer: Analyzer = analyzerBuilder() |
上面提到过sessionState是由BaseSessionStateBuilder.build
创建的,那么同样去BaseSessionStateBuilder
里面看一下这个analyzerBuilder
是啥
1 | protected def analyzer: Analyzer = new Analyzer(catalog, conf) { |
它是一个匿名内部类重写了Analyzer
的几个扩展规则,这几个扩展规则在Analyzer
中均为空,其中extendedCheckRules
是在Analyzer
的父接口CheckAnalysis
中定义的,我们上面提到了,Hudi第一个自定义扩展是扩展了自己的sqlParser,那么剩下的两个都是作用在analysis
阶段
1 | HoodieAnalysis.customResolutionRules.foreach { ruleBuilder => |
injectResolutionRule
先看一下injectResolutionRule
1
2
3def injectResolutionRule(builder: RuleBuilder): Unit = {
resolutionRuleBuilders += builder
}
结合BaseSessionStateBuilder
中的customResolutionRules
,可知,这第二个自定义扩展是实现了extendedResolutionRules
中的customResolutionRules
,对应Analyzer
的规则批batches
的 Resolution
1
2
3
4
5
6
7protected def customResolutionRules: Seq[Rule[LogicalPlan]] = {
extensions.buildResolutionRules(session)
}
private[sql] def buildResolutionRules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
resolutionRuleBuilders.map(_.apply(session))
}
injectPostHocResolutionRule
再看一下injectPostHocResolutionRule
1 | def injectPostHocResolutionRule(builder: RuleBuilder): Unit = { |
可知第三个自定义扩展是实现了postHocResolutionRules
中的customPostHocResolutionRules
,对应Analyzer
的规则批batches
的 Post-Hoc Resolution
Hudi自定义analysis规则
Hudi自定义扩展具体的规则有
extendedResolutionRules
: session => HoodieResolveReferences(session) 和 session => HoodieAnalysis(session)
postHocResolutionRules
: session => HoodiePostAnalysisRule(session)
具体代码为:
1 | def customResolutionRules: Seq[RuleBuilder] = { |
可以看出来如果是spark3的话,还有别的规则,因为这里是spark2,所以只看默认的规则就可以了
看完sparkSession.sessionState.analyzer
的定义,接下来就看analysis
具体的实现方法了
executeAndCheck
1 | def executeAndCheck(plan: LogicalPlan): LogicalPlan = AnalysisHelper.markInAnalyzer { |
根据参考文章中的学习,我们知道,主要分为2 步:
1、执行 execute(plan)
2、校验 checkAnalysis(analyzed)
这里主要讲Hudi的实现,我们知道在执行阶段实际是调用父类RuleExecutor
的execute
方法,它会遍历规则批中的规则,并应用规则,调用规则的apply
方法
1 | override def execute(plan: LogicalPlan): LogicalPlan = { |
1 | def execute(plan: TreeType): TreeType = { |
应用Hudi规则
规则批中有很多规则,这里只跟踪规则如何作用到Hudi的自定义规则中,并最终执行Hudi的建表的,首先遍历到extendedResolutionRules
的customResolutionRules
,具体有两个
第一个规则:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport {
private lazy val analyzer = sparkSession.sessionState.analyzer
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Resolve merge into
case mergeInto @ MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions)
if sparkAdapter.isHoodieTable(target, sparkSession) && target.resolved =>
val resolver = sparkSession.sessionState.conf.resolver
val resolvedSource = analyzer.execute(source)
def isInsertOrUpdateStar(assignments: Seq[Assignment]): Boolean = {
if (assignments.isEmpty) {
true
} else {
......
因为这里为create table,所以没有匹配到 mergeInto等
第二个规则:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Convert to MergeIntoHoodieTableCommand
case m @ MergeIntoTable(target, _, _, _, _)
if m.resolved && sparkAdapter.isHoodieTable(target, sparkSession) =>
MergeIntoHoodieTableCommand(m)
// Convert to UpdateHoodieTableCommand
case u @ UpdateTable(table, _, _)
if u.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
UpdateHoodieTableCommand(u)
// Convert to DeleteHoodieTableCommand
case d @ DeleteFromTable(table, _)
if d.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
DeleteHoodieTableCommand(d)
// Convert to InsertIntoHoodieTableCommand
case l if sparkAdapter.isInsertInto(l) =>
val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
table match {
case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
case _ =>
l
}
// Convert to CreateHoodieTableAsSelectCommand
case CreateTable(table, mode, Some(query))
if query.resolved && sparkAdapter.isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
// Convert to CompactionHoodieTableCommand
case CompactionTable(table, operation, options)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionHoodieTableCommand(catalogTable, operation, options)
// Convert to CompactionHoodiePathCommand
case CompactionPath(path, operation, options) =>
虽然这里为CreateTable
,但是query
为空,所以没有匹配到,不会走到CreateHoodieTableAsSelectCommand
,也就是customResolutionRules
的两个规则都没有起到实际作用,但是如果是其他的语句,比如merge into
、ctas
等到是会起作用的。
接下来遍历postHocResolutionRules
,其中有两个规则比较重要DataSourceAnalysis(conf)
和customPostHocResolutionRules
,首先应用规则DataSourceAnalysis
1 | case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { |
这里会匹配到第一个case,根据前面讲的我们知道它是CreateTable(tableDesc, mode, None)
,它的provider
为hudi,所以是datasourceTable
,也就是经过这个规则应用后CreateTable
转变为了CreateDataSourceTableCommand
再看规则customPostHocResolutionRules
1 | case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { |
匹配到CreateDataSourceTableCommand
,并判断是Hudi表,将其转变为CreateHoodieTableCommand
,这个CreateHoodieTableCommand
的run方法实现了创建Hudi表的逻辑
1 | case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean) |
optimization 和 planning
那么又是在哪里执行了run方法呢,其实是在后面的planning
阶段,根据前面的学习,我们知道analysis
阶段后面还有optimization
和planning
阶段,但是参考文章中提到
对于 SELECT 从句,sql() 方法只会触发 parsing 和 analysis 阶段。
optimization
和planning
阶段是在show() 方法触发的
但是run方法又是在planning
阶段,这就很费解,经过自己阅读源码发现,对于select语句确实是这样的,但是对于Command
类型的,不用show方法也会有optimization
和planning
,入口为
1 | new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) |
LogicalPlan
Dataset
有一个变量LogicalPlan
,它不是懒加载的,在new
的时候就会触发,我们知道在analysis
阶段返回的是CreateHoodieTableCommand
,它是Command
的子类,所以会匹配成功,调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
action(qe.executedPlan)
}
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, qe, end - start)
result
} catch {
case e: Exception =>
sparkSession.listenerManager.onFailure(name, qe, e)
throw e
}
}
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
protected def planner = sparkSession.sessionState.planner
其中的executedPlan就会先触发optimization
接着触发planning
,具体的逻辑也是应用各种规则/策略,参考文章已经讲的很详细了,因为这两个阶段我们没有自定义扩展规则,所以主要看一下run方法是在哪里执行的
在上面的代码中我们看到executedPlan
会用到sparkPlan
,继而触发planner.plan
,对应planning
阶段,那么需要看一下planner
在哪定义的
1 | protected def planner: SparkPlanner = { |
同样这里的planner是在BaseSessionStateBuilder
定义的,它重写了extraPlanningStrategies
,接着看一下planner.plan
planner.plan
1 | /** A list of execution strategies that can be used by the planner */ |
这个方法是将逻辑计划LogicalPlan
转为物理计划PhysicalPlan
,根据参考文章我们知道
上面源码的核心在于这一行:
val candidates = strategies.iterator.flatMap(_(plan))
部分同学可能看不懂这里的语法,实际上问题的答案在GenericStrategy的源码中:
策略序列的迭代器扁平化后对其中每一次都调用它的apply方法,这个方法会将逻辑计划转换成物理计划的序列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14 /**
* 给定一个 LogicalPlan,返回可用于执行的物理计划列表。
* 如果此策略不适用于给定的逻辑操作,则应返回空列表
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
/**
* 返回执行计划的物理计划的占位符。
* QueryPlanner将使用其他可用的执行策略自动填写此占位符。
*/
protected def planLater(plan: LogicalPlan): PhysicalPlan
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}
也就是这里会遍历strategies
并调用它的apply方法,strategies
是在SparkPlanner
中定义的
1 | class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods) |
BasicOperators
其中有一个BasicOperators,它的apply方法为1
2
3
4
5
6
7
8
9object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
val toRow = encoder.createSerializer()
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil
我们这里的plan为CreateHoodieTableCommand
是RunnableCommand
的子类,所以返回ExecutedCommandExec
,它是SparkPlan
的子类
回到上面的withAction
方法,其中会调用withNewExecutionId
1 | def withNewExecutionId[T]( |
withNewExecutionId
方法会调用方法体body
,这里的body
为executeCollect
,根据withAction
方法的定义可知executeCollect
为SparkPlan
中的方法
executeCollect
SparkPlan.executeCollect
1
2
3
4
5
6
7
8
9
10
11
12/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[InternalRow] = {
val byteArrayRdd = getByteArrayRdd()
val results = ArrayBuffer[InternalRow]()
byteArrayRdd.collect().foreach { countAndBytes =>
decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
}
results.toArray
}
但是这里实际调用的是子类ExecutedCommandExec
重写的方法
ExecutedCommandExec.executeCollect
1 | case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { |
在调用executeCollect
方法时先执行sideEffectResult
的逻辑,其中会执行cmd.run
方法,这里的run
方法是CreateHoodieTableCommand
重写的run
方法,实现了创建Hudi表的逻辑
普通Hive表
现在我们知道通过using hudi
创建Hudi表的整个逻辑了,那么如果我们扩展了Hudi但是不是用using hudi
,就想建一个普通的表,那么它的逻辑是啥,会不会因为扩展了Hudi而有问题呢?
代码中我们首先要启用Hive:.enableHiveSupport()
上面我们讲到,#createTable
,#createHiveTable
,他俩的区别为#createTable
多了一个tableProvider
,tableProvider
包含关键字USING
,如果sql里没有using
,那么statement
应该匹配为#createHiveTable
,在后面的statement
匹配时应该匹配到CreateHiveTableContext
1 | public static class CreateHiveTableContext extends StatementContext { |
接着调用visitCreateHiveTable
,同样是在SparkSqlAstBuilder
1 | override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) { |
和visitCreateTable
的逻辑差不多,其中一个区别是provider
为默认为的”hive”,同样返回CreateTable
因为这里启用了Hive,我们需要看一下HiveSessionStateBuilder
中的规则有啥不同
1 | override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { |
可见多了Hive相关的规则ResolveHiveSerdeTable
和 HiveAnalysis
,我们知道在上面讲的几个规则中,因为它不是datasourceTable
,所以都没有匹配上,在DataSourceAnalysis
后面有一个HiveAnalysis
,看一下它的apply方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output.map(_.name))
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, query.output.map(_.name), mode)
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if DDLUtils.isHiveTable(provider) =>
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output.map(_.name))
}
}
val HIVE_PROVIDER = "hive"
def isHiveTable(table: CatalogTable): Boolean = {
isHiveTable(table.provider)
}
因为他是hiveTable
所以匹配到了第二个,那么转化为CreateTableCommand
,在后面的customPostHocResolutionRules
也就是HoodiePostAnalysisRule
,也不会匹配到CreateTableCommand
,也就是没有用到Hudi的逻辑,所以最终执行CreateTableCommand
的run方法,也就是执行sparkSession.sessionState.catalog.createTable
1 | case class CreateTableCommand( |
可见扩展了Hudi规则并不会影响Hive普通表的创建
总结
本文通过结合Hudi源码和Spark源码,以Create Table
为例,理清了从SQL解析到最后执行的整个流程,搞懂了其中的几个关键步骤,知道了如何从Spark SQL源码跳转到Hudi Spark SQL源码并执行Hudi的建表逻辑的,并且明白了创建Hudi表和普通Hive表的区别,这样对于其他的SQL语句比如CTAS
、INSERT
、UPDATE
、DELETE
、MERGE
等,也就能很容易的知道它们的逻辑了,后面的只要关注一些关键点,比如应该模式匹配到哪一类、版本之间的区别,最后再看Hudi本身的逻辑就好了