前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
上一篇文章Hudi Spark SQL源码学习总结-Create Table总结了Create Table
的源码执行逻辑,这一篇继续总结CTAS
,之所以总结CTAS
,是之前在我提交的一个PR中发现,Spark2和Spark3.2.1版本的CTAS的逻辑不一样,最终走的Hudi实现类也不一样,所以本文分Spark2和Spark3.2.1两个版本分析
不同点
先总结一下Spark2和Spark3.2.1的整体逻辑的不同点
Spark2: visitCreateTable
->CreateTable
->CreateHoodieTableAsSelectCommand.run
Spark3.2.1: 前提配置了:spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
,如果没有配置则和Spark2一样
visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->true->CreateTableAsSelect
->HoodieCatalog.createHoodieTable
visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->false->CreateTable
->CreateHoodieTableAsSelectCommand.run
Spark2和Spark3.2.1不同的关键点有两个:
- 1、配置
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
- 2、isV2Provider(“hudi”)返回ture
只要有一个不满足,Spark3.2.1的逻辑就和Spark2一样,引进HoodieCatalog
和令hudi
为V2Provider的PR为: [HUDI-3254] Introduce HoodieCatalog to manage tables for Spark Datasource V2
目前master最新代码已将spark3.2.1的isV2Provider(“hudi”)改为了false,也就是Spark2和Saprk3.2.1的逻辑又一致了,PR:[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration
版本
Hudi https://github.com/apache/hudi/pull/5592 本文基于这个PR对应的代码进行调试分析,因为我就是在贡献这个PR时才发现Spark3.2.1和Saprk2的CTAS
的逻辑不同的
示例代码
还是直接拿源码里的TestCreateTable
的测试语句
1 | spark.sql( |
不过需要提一下, 这里的spark是如何创建的,因为在分析Spark3.2.1的逻辑时会用到,先贴在这里:
1 | protected lazy val spark: SparkSession = SparkSession.builder() |
打印执行计划
和上篇文章一样我们先打印一下计划,方便我们分析
1 | config("spark.sql.planChangeLog.level", "INFO") |
和上一篇文章的不同点是加了配置"spark.sql.planChangeLog.level", "INFO"
,之所以上篇文章不加这篇文章加,是因为这个配置在Spark3.1.0才有得,所以对于Spark2的代码不生效,不过在我们分析Spark3.2.1的执行计划会比较有用,另外提一下,开启这个配置是通过logBasedOnLevel(message)
来打印信息的,一共有三个方法调用了logBasedOnLevel
,分别为logRule
:如果rule生效,打印oldPlan
和newPlan
,logBatch
:打印Batch
的前后信息,logMetrics
:打印整体指标,但是在planner.plan
中没有调用这几个方法,所以对于分析哪些strategies
会生效是没用的,不过对于分析analysis
阶段的哪些规则会生效还是非常有用的
1 | private def logBasedOnLevel(f: => String): Unit = { |
Spark2
Spark2的逻辑和上一篇文章差不多,由于上一篇已经总结过了,所以本文只讲不同的地方,如果掌握了上一篇文章的逻辑的话,再看CTAS的逻辑还是比较简单的。
打印信息
1 | == Parsed Logical Plan == |
singleStatement
根据上篇文章中的逻辑,可知这里的CTAS
语句同样对应Spark源码里的SqlBase.g4
1 | singleStatement |
不过这里有点不同的是:query不为空 (AS? query) ,所以在visitCreateTable
中返回CreateTable(tableDesc, mode, Some(query))
1 | override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { |
analysis
那么在analysis
阶段中Hudi的自定义规则customResolutionRules
中的HoodieAnalysis
的apply方法中会被匹配到
1 | case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] |
这里转化为CreateHoodieTableAsSelectCommand
,它和CreateHoodieTableCommand
一样是是Command
的子类
1 | case class CreateHoodieTableAsSelectCommand( |
所以会最终会调用ExecutedCommandExec.executeCollect
,触发CreateHoodieTableAsSelectCommand
重写的run方法,实现Hudi自己的逻辑,Hudi自己的逻辑可以在Hudi源码里调试跟踪,本文就不总结了。
Spark3.2.1
Hudi支持不同的Spark版本,默认是Spark2.4.4,要想使用Spark3.2.1版本,可以通过如下命令编译打包:
1 | mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12 |
要想调试Spark3.2.1,可以根据上面的命令先打包或者install到本地,再新建一个测试项目引用我们自己打的包用来调试,也可以直接在Hudi源码里修改配置Idea Spark3.2.1的环境,不过比较麻烦,本人用的第二种方法,同样的这里也只讲关键的不同点
打印信息
Spark3.2.1的打印信息会比Spark2更全一点,我们可以看到最终的Physical Plan
与AtomicCreateTableAsSelect
和HoodieCatalog
有关1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21== Parsed Logical Plan ==
'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Analyzed Logical Plan ==
CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Optimized Logical Plan ==
CommandResult AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
+- CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Physical Plan ==
CommandResult <empty>
+- AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
+- *(1) Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- *(1) Scan OneRowRelation[]
PlanChangeLogger的日志比较多,会打印哪些规则没有生效,哪些规则生效了,具体怎么生效的等,由于比较多,这里只贴一小部分1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
237720 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Substitution has no effect.
7721 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Disable Hints has no effect.
7724 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Hints has no effect.
7728 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Simple Sanity Check has no effect.
8309 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger -
=== Applying Rule org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog ===
!'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4] +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation +- OneRowRelation
8331 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger -
=== Result of Batch Resolution ===
!'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog 3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4] +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation +- OneRowRelation
8334 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Remove TempResolvedColumn has no effect.
......
=== Metrics of Executed Rules ===
Total number of runs: 141
Total time: 0.6459626 seconds
Total number of effective runs: 2
Total time of effective runs: 0.302878 seconds
由于比较长,导致换行,oldPan和newPlan的对比效果不是很明显,不过可以大概看出来前后变化就行,也可以自己调试对比
ANTLR
上一篇讲到Hudi有三个g4
文件,一个在hudi-spark模块下,另外两个在hudi-spark2模块下,同样的在hudi-spark3模块下也有两个同名的g4
文件,不过内容和Spark2的不一样,具体为:
hudi-spark模块下的
HoodieSqlCommon.g4
hudi-spark3模块下的
SqlBase.g4
,拷贝自的Spark3.2.0源码里的SqlBase.g4
hudi-spark3模块下的
HoodieSqlBase.g4
其中导入了上面的SqlBase.g4
HoodieSqlBase.g4
:
1 | grammar HoodieSqlBase; |
parsing
同样的parsePlan
首先调用HoodieCommonSqlParser.parsePlan
,这个是公共的,和Spark2一样,返回null,调用sparkExtendedParser.parsePlan
1 | private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate) |
上一篇讲到Spark2的sparkExtendedParser
为HoodieSpark2ExtendedSqlParser
,那么Spark3的是啥呢?很简单和Spark2的逻辑一样,来看一下:
1 | private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser |
根据上面的代码可知sparkAdapter
为Spark3_2Adapter
,接着看一下Spark3_2Adapter.createExtendedSparkParser
1 | override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { |
所以这里的Spark3的sparkExtendedParser
为HoodieSpark3_2ExtendedSqlParser
,接着到了HoodieSpark3_2ExtendedSqlParser.parsePlan
,这里和Spark2的逻辑不一样
1 | override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => |
可以看到这里parser
同样为HoodieSqlBaseParser
,但是对于builder.visit(parser.singleStatement())
,Spark3和Spark2是不一样的,为啥不一样?我们接着往下看:
我们在Spark3的HoodieSqlBase.g4
中可以看到statement
中是有#createTable
的
1 | | createTableHeader ('(' colTypeList ')')? tableProvider? |
其中 createTableHeader
、tableProvider
、AS
、query
都是引自SqlBase.g4
,所以这里的CTAS
能匹配上,这里的parser.singleStatement()
和之前讲的一样,最终都会调用builder.visitCreateTable
,不同的是,这里的builder
为HoodieSpark3_2ExtendedSqlAstBuilder
,所以需要看一下它的visitCreateTable
有何不同
1 | override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { |
这里会匹配到case Some(query)
,返回CreateTableAsSelectStatement
,这就是和Spark2(或者说Spark源码里的visitCreateTable)不同之处,Spark2返回的是CreateTable(tableDesc, mode, Some(query))
,那么又是在哪里对CreateTableAsSelectStatement
进行处理的呢
ResolveCatalogs 和 ResolveSessionCatalog
有两个规则类会匹配CreateTableAsSelectStatement
,ResolveCatalogs
是在Analyzer
的batches
中定义的,ResolveSessionCatalog
是在BaseSessionStateBuilder.analyzer
重写的的extendedResolutionRules
中定义的(我们在PlanChangeLogger的日志中可知,真正起作用的是ResolveSessionCatalog
)
1 | override def batches: Seq[Batch] = Seq( |
1 | protected def analyzer: Analyzer = new Analyzer(catalogManager) { |
他们两个的apply方法分别为:
1 | class ResolveCatalogs(val catalogManager: CatalogManager) |
1 | class ResolveSessionCatalog(val catalogManager: CatalogManager) |
可以看到这两个规则都试图去匹配CreateTableAsSelectStatement
,区别是一个匹配NonSessionCatalogAndTable
,另一个匹配SessionCatalogAndTable
,根据名字判断,他们两个的判断是反过来的,总有一个会匹配上,那么这俩具体实现是啥呢,我们以SessionCatalogAndTable
为例,我们知道scala在匹配样例类对象时回去调用它的unapply
方法,这里的参数nameParts
为CreateTableAsSelectStatement
的第一个参数,在上面的visitCreateTable
可知,它是由visitCreateTableHeader(ctx.createTableHeader)
返回的table
1 | object SessionCatalogAndTable { |
最终会调用到CatalogAndIdentifier.unapply
,因为我们建表语句中没有加库名限定,所以这里的nameParts
为Seq(tableName)
,也就是length等于,所以返回Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
,值为:Some((HoodieCatalog, Identifier.of(default., tableName)),具体为啥为HoodieCatalog
,原因和我们在开头提到的一个配置有关
1 | def sparkConf(): SparkConf = { |
再看CatalogV2Util.isSessionCatalog
1 | def isSessionCatalog(catalog: CatalogPlugin): Boolean = { |
这里的HoodieCatalog.name()
是在V2SessionCatalog
实现的
1 | class V2SessionCatalog(catalog: SessionCatalog) |
所以CatalogV2Util.isSessionCatalog(catalog)
为ture,NonSessionCatalogAndTable
正好相反,!CatalogV2Util.isSessionCatalog(catalog)
返回 false
1 | object NonSessionCatalogAndTable { |
所以最终在ResolveSessionCatalog
中匹配成功
1 | case c @ CreateTableAsSelectStatement( |
这一块的逻辑是判断是否为V2Provider
,如果不是的话返回CreateTable
,是的话返回CreateTableAsSelect
,关于判断是否为V2Provider
的逻辑比较多,这里先不讲,我们放在后面讲,我们这个版本的代码,是V2Provider
,所以返回CreateTableAsSelect
,这就是和Spark2不同的关键点,如果不是V2Provider
,那么和Saprk2一样返回CreateTable
。我们会在下面讲到:CreateTableAsSelect
最终调用HoodieCatalog
创建Hudi表,而CreateTable
我们在上面讲Spark2时已知最终调用CreateHoodieTableAsSelectCommand
1 | case class CreateTableAsSelect( |
那么CreateTableAsSelect
又是在哪里被转化,最后调用Hudi的逻辑的呢?
planning
1 | private[sql] val logicalPlan: LogicalPlan = { |
CreateTableAsSelect
也为Command
类型,跟上一篇文章讲的Spark2一样,Spark3也在new DataSet
时通过变量logicalPlan
,触发后面的planning
阶段
DataSourceV2Strategy
在SparkPlanner
的strategies
里有一个DataSourceV2Strategy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
extends SparkStrategies with SQLConfHelper {
def numPartitions: Int = conf.numShufflePartitions
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
new DataSourceV2Strategy(session) ::
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
SparkScripts ::
BasicOperators :: Nil)
看一下它的apply方法
1 | override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { |
这里会匹配到CreateTableAsSelect
,然后根据catalog
的类型选择返回AtomicCreateTableAsSelectExec
或者CreateTableAsSelectExec
,那么需要看一下catalog
是否为StagingTableCatalog
HoodieCatalog
1 | class HoodieCatalog extends DelegatingCatalogExtension |
根据HoodieCatalog
的定义我们知道它实现了StagingTableCatalog
,所以返回AtomicCreateTableAsSelectExec
AtomicCreateTableAsSelectExec
1 | case class AtomicCreateTableAsSelectExec( |
可以看到AtomicCreateTableAsSelectExec
有一个run
方法,那么它的run
方法是哪里触发的呢?
V2CommandExec
其实AtomicCreateTableAsSelectExec
是V2CommandExec
的子类
1 | case class AtomicCreateTableAsSelectExec( |
AtomicCreateTableAsSelectExec
的executeCollect
是在V2CommandExec
实现的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18abstract class V2CommandExec extends SparkPlan {
/**
* Abstract method that each concrete command needs to implement to compute the result.
*/
protected def run(): Seq[InternalRow]
/**
* The value of this field can be used as the contents of the corresponding RDD generated from
* the physical plan of this command.
*/
private lazy val result: Seq[InternalRow] = run()
/**
* The `execute()` method of all the physical command classes should reference `result`
* so that the command can be executed eagerly right after the command query is created.
*/
override def executeCollect(): Array[InternalRow] = result.toArray
在上面eagerlyExecuteCommands
中会调用executeCollect
方法,继而调用AtomicCreateTableAsSelectExec
的run
方法,那么这个run
方法是实现hudi逻辑的地方吗?
1 | override protected def run(): Seq[InternalRow] = { |
我们看到有两个地方和HoodieCatalog
有关,一个是catalog.stageCreate
,这里判断是否为Hudi表,是的话返回HoodieStagedTable
,所以这里的stagedTable
为HoodieStagedTable
1 | override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = { |
另一个是writeToTable
,它是在父接口TableWriteExecHelper
实现的
1 | private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write { |
无论是HoodieStagedTable
还是BasicStagedTable
,都既实现了SupportsWrite
,也实现了StagedTable
,所以会先匹配上SupportsWrite
,然后再匹配上StagedTable
,最终调用commitStagedChanges
,其他的细节比如writeBuilder
和writeWithV1
本文先不研究
1 | case class HoodieStagedTable(ident: Identifier, |
commitStagedChanges
方法中会调用catalog.createHoodieTable
创建Hudi表,实现Hudi自己的逻辑,这样就是为啥在我开头提到的PR中为啥修改CTAS
的代码会涉及HoodieCatalog
类了
1 | def createHoodieTable(ident: Identifier, |
isV2Provider
来看一下isV2Provider
方法
1 | private def isV2Provider(provider: String): Boolean = { |
这里的参数provider
为hudi,所以isHiveTable
是false,接下来看一下lookupDataSourceV2
1 | /** |
这里的useV1Sources
默认为”avro,csv,json,kafka,orc,parquet,text”,继续看lookupDataSource
,其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource
org.apache.hudi.Spark3DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
等,然后过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource
1 | /** Given a provider name, look up the data source class definition. */ |
看一下Spark3DefaultSource
1 | class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider { |
可以看到shortName
等于hudi,而其他的 DefaultSource:hudi_v1,HoodieParquetFileFormat:Hoodie-Parquet,另外Spark2对应的为Spark2DefaultSource
1
2
3class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
}
所以cls
为Spark3DefaultSource
1 | cls.newInstance() match { |
Spark3DefaultSource
既是DataSourceRegister
的子类又是TableProvider
的子类,所以会先匹配DataSourceRegister
,但是由于useV1Sources
不包含hudi
,所以继续匹配TableProvider
,useV1Sources
不包含Spark3DefaultSource
,所以返回Some(Spark3DefaultSource)
1 | DataSource.lookupDataSourceV2(provider, conf) match { |
Spark3DefaultSource
不是FileDataSourceV2
的子类,所以会匹配到case Some(_) => true
返回true,所以hudi是V2Provider
最新代码
通过上面讲的我们知道了为啥hudi是V2Provider
,所以最终CTAS会调用HoodieCatalog.createHoodieTable
,而不是调用CreateHoodieTableAsSelectCommand
,这就是和Spark2不同的关键点,不过最新代码又改成和Spark2一样调用CreateHoodieTableAsSelectCommand
,不走HoodieCatalog
的逻辑了,原因是因为把Spark3DefaultSource
注释掉了一部分
1 | class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ { |
我们看到Spark3DefaultSource
不是TableProvider
的子类了,所以最终isV2Provider
返回false,后面的逻辑就和Spark2的一样了。
相关PR:[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration
我们可以打印一下它的执行计划
1 | == Parsed Logical Plan == |
可以看到确实和Spark2一样也是通过CreateHoodieTableAsSelectCommand
来实现Hudi逻辑的