前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
本文记录Spark如何在表存在的情况时覆盖写入mysql但不修改已有的表结构,并进行主要的源码跟踪以了解其实现原理。主要场景为先用建表语句建好mysql表,然后用spark导入数据,可能会存在多次全表覆写导入的情况。
代码
已上传github
主要的参数为.option(“truncate”, true),可以参考Spark官网http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
主要代码逻辑为,读取csv,进行日期转化,然后覆盖写入到已经建好的mysql表中。
1 | package com.dkl.blog.spark.mysql |
源码跟踪
本文仅进行简单的源码跟踪
Spark2.2.1
本来想以Spark3.0.1版本进行讲解,后来发现Spark3源码稍微做了些改动,因为本人之前主要用Spark2.2进行学习总结,所以先用Spark2.2.1的源码进行讲解,后面再在此基础上进行讲解Spark3的源码的一些变化,其实主要逻辑是一样的。
jdbc
先从入口jdbc函数开始
1 | def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { |
format(“jdbc”).save()
format方法返回DataFrameWriter1
2
3
4def format(source: String): DataFrameWriter[T] = {
this.source = source //source="jdbc"
this
}
1 | def save(): Unit = { |
SaveIntoDataSourceCommand
接着执行SaveIntoDataSourceCommand的run方法
1 | case class SaveIntoDataSourceCommand( |
DataSource.write
run方法里主要执行DataSource.write方法
1 | def write(mode: SaveMode, data: DataFrame): Unit = { |
这里会执行到1
2case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
CreatableRelationProvider.createRelation
然后看一下createRelation方法,这里的CreatableRelationProvider是一个接口,这里实际上执行其子类JdbcRelationProvider的createRelation
1 | trait CreatableRelationProvider { |
JdbcRelationProvider.createRelation
下面是最后真正要执行的方法,首先判断表是否存在,我们这个场景下表示存在的,然后进行mode的模式匹配,这里为Overwrite,然后进入到第一个if语句,我们这里在上面的程序里设置了
truncate为true,所以会满足条件,然后先执行truncateTable方法进行删除表数据但不会删除表结构,再执行saveTable方法将df的数据保存到表中实现覆盖写入。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JDBCOptions(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val conn = JdbcUtils.createConnectionFactory(options)()
try {
val tableExists = JdbcUtils.tableExists(conn, options)
if (tableExists) {//首先判断表是否存在
mode match {
case SaveMode.Overwrite => //如果mode为Overwrite
if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) { //判断truncate是否为true
// In this case, we should truncate table and then load.
truncateTable(conn, options.table) //先truncateTable
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options) //再保存数据
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
dropTable(conn, options.table)
createTable(conn, df, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
case SaveMode.Append =>
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
case SaveMode.ErrorIfExists =>
throw new AnalysisException(
s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
createTable(conn, df, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
} finally {
conn.close()
}
createRelation(sqlContext, parameters)
}
}
truncateTable
最后看一下truncateTable实现原理,这里其实是执行的TRUNCATE TABLE命令1
2
3
4
5
6
7
8
9
10
11
12
/**
* Truncates a table from the JDBC database.
*/
def truncateTable(conn: Connection, table: String): Unit = {
val statement = conn.createStatement
try {
statement.executeUpdate(s"TRUNCATE TABLE $table")
} finally {
statement.close()
}
}
总结
本来主要讲了如何实现Spark在不删除表结构的情况下进行overwrite覆盖写入mysql表,并跟踪一下源码,了解其实现原理。代码层面主要是加了一个参数.option(“truncate”, true),源码层面
主要逻辑是先判断表是否存在,如果表存在,然后判断truncate是否为true,如果为true,则不drop表,而是执行TRUNCATE TABLE表里,清空表数据然后再写表,这样就实现了我们的需求