前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。
- 关于UDF:UDF:User Defined Function,用户自定义函数。
1、创建测试用DataFrame
下面以Spark2.x为例给出代码,关于Spark1.x创建DataFrame可在最后的完整代码里查看。1
2
3
4
5
6// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = spark.createDataFrame(userData).toDF("name", "age")
userDF.show
1 | +-----+---+ |
1 | // 注册一张user表 |
2、Spark Sql用法
2.1 通过匿名函数注册UDF
下面的UDF的功能是计算某列的长度,该列的类型为String
2.1.1 注册
Spark2.x:
1
spark.udf.register("strLen", (str: String) => str.length())
Spark1.x:
1
sqlContext.udf.register("strLen", (str: String) => str.length())
2.2.2 使用
仅以Spark2.x为例1
spark.sql("select name,strLen(name) as name_len from user").show
1 | +-----+--------+ |
2.2 通过实名函数注册UDF
实名函数的注册有点不同,要在后面加 _(注意前面有个空格)
定义一个实名函数1
2
3
4
5
6
7
8
9
10
11/**
* 根据年龄大小返回是否成年 成年:true,未成年:false
*/
def isAdult(age: Int) = {
if (age < 18) {
false
} else {
true
}
}
注册(仅以Spark2.x为例)1
spark.udf.register("isAdult", isAdult _)
至于使用都是一样的
2.3 关于spark.udf和sqlContext.udf
在Spark2.x里,两者实际最终都是调用的spark.udf
sqlContext.udf源码1
def udf: UDFRegistration = sparkSession.udf
可以看到调用的是sparkSession的udf,即spark.udf
3、DataFrame用法
DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法
3.1注册
1 | import org.apache.spark.sql.functions._ |
3.2 使用
可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能
- 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
1
2
3
4//通过withColumn添加列
userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
//通过select添加列
userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
结果均为1
2
3
4
5
6
7
8+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
| Leo| 16| 3| false|
|Marry| 21| 5| true|
| Jack| 14| 4| false|
| Tom| 18| 3| true|
+-----+---+--------+-------+
3.3 withColumn和select的区别
可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。
- 注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* Returns a new Dataset by adding a column or replacing the existing column that has
* the same name.
*
* @group untypedrel
* @since 2.0.0
*/
def withColumn(colName: String, col: Column): DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
if (shouldReplace) {
val columns = output.map { field =>
if (resolver(field.name, colName)) {
col.as(colName)
} else {
Column(field)
}
}
select(columns : _*)
} else {
select(Column("*"), col.as(colName))
}
}
4、完整代码
下面的代码的功能是使用UDF给user表添加两列:name_len、isAdult,每个输出结果都是一样的1
2
3
4
5
6
7
8+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
| Leo| 16| 3| false|
|Marry| 21| 5| true|
| Jack| 14| 4| false|
| Tom| 18| 3| true|
+-----+---+--------+-------+
代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148package com.dkl.leanring.spark.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Spark Sql 用户自定义函数示例
*/
object UdfDemo {
def main(args: Array[String]): Unit = {
oldUdf
newUdf
newDfUdf
oldDfUdf
}
/**
* 根据年龄大小返回是否成年 成年:true,未成年:false
*/
def isAdult(age: Int) = {
if (age < 18) {
false
} else {
true
}
}
/**
* 旧版本(Spark1.x)Spark Sql udf示例
*/
def oldUdf() {
//spark 初始化
val conf = new SparkConf()
.setMaster("local")
.setAppName("oldUdf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = sc.parallelize(userData).toDF("name", "age")
// 注册一张user表
userDF.registerTempTable("user")
// 注册自定义函数(通过匿名函数)
sqlContext.udf.register("strLen", (str: String) => str.length())
sqlContext.udf.register("isAdult", isAdult _)
// 使用自定义函数
sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show
//关闭
sc.stop()
}
/**
* 新版本(Spark2.x)Spark Sql udf示例
*/
def newUdf() {
//spark初始化
val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate()
// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = spark.createDataFrame(userData).toDF("name", "age")
// 注册一张user表
userDF.createOrReplaceTempView("user")
//注册自定义函数(通过匿名函数)
spark.udf.register("strLen", (str: String) => str.length())
//注册自定义函数(通过实名函数)
spark.udf.register("isAdult", isAdult _)
spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show
//关闭
spark.stop()
}
/**
* 新版本(Spark2.x)DataFrame udf示例
*/
def newDfUdf() {
val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate()
// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = spark.createDataFrame(userData).toDF("name", "age")
import org.apache.spark.sql.functions._
//注册自定义函数(通过匿名函数)
val strLen = udf((str: String) => str.length())
//注册自定义函数(通过实名函数)
val udf_isAdult = udf(isAdult _)
//通过withColumn添加列
userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
//通过select添加列
userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
//关闭
spark.stop()
}
/**
* 旧版本(Spark1.x)DataFrame udf示例
* 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的
* 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
*/
def oldDfUdf() {
//spark 初始化
val conf = new SparkConf()
.setMaster("local")
.setAppName("oldDfUdf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val userDF = sc.parallelize(userData).toDF("name", "age")
import org.apache.spark.sql.functions._
//注册自定义函数(通过匿名函数)
val strLen = udf((str: String) => str.length())
//注册自定义函数(通过实名函数)
val udf_isAdult = udf(isAdult _)
//通过withColumn添加列
userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
//通过select添加列
userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
//关闭
sc.stop()
}
}