前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
前面学习总结了Hive分区表,现在学习总结一下Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,并记录一下遇到的问题以及如何解决。
1、Spark创建分区表
只写主要代码,完整代码见附录1
2
3
4val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
//可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表
df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
然后在Hive命令行里看一下,新建的表是否有分区字段year
用命令1
desc new_test_partition;
或1
show create table new_test_partition;
根据下面的结果可以看到新建的表确实有分区字段year1
2
3
4
5
6
7
8
9
10
11
12hive> desc new_test_partition;
OK
id string
name string
age int
year string
# Partition Information
# col_name data_type comment
year string
Time taken: 0.432 seconds, Fetched: 9 row(s)
2、向已存在的表插入数据
2.1 Spark创建的分区表
- 这种情况其实和建表语句一样就可以了
- 不需要开启动态分区
1 df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
当然也有其他方式插入数据,会在后面讲到。
2.2 在Hive命令行创建的表
- 这里主要指和Spark创建的表的文件格式不一样,Spark默认的文件格式为PARQUET,为在命令行Hive默认的文件格式为TEXTFILE,这种区别,也导致了异常的出现。
- 需要开启动态分区
- 不开启会有异常:
1 Exception in thread "main" org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
2.2.1 建表
用Hive分区表学习总结的建表语句建表(之前已经建过就不用重复建了)。1
2
3
4
5
6
7
8create table test_partition (
id string comment 'ID',
name string comment '名字',
age int comment '年龄'
)
comment '测试分区'
partitioned by (year int comment '年')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
2.2.2 异常
试着用上面的插入语句插入数据1
df.write.mode("append").partitionBy("year").saveAsTable("test_partition")
抛出异常1
Exception in thread "main" org.apache.spark.sql.AnalysisException: The format of the existing table dkl.test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
原因就是上面说的文件格式不一致造成的。
2.2.3 解决办法
用fomat指定格式1
df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")
2.3 其他方法
1 | df.createOrReplaceTempView("temp_table") |
其中insertInto不需要也不能将df进行partitionBy,否则会抛出异常
1 | df.write.partitionBy("year").insertInto("test_partition") |
3、完整代码
1 | package com.dkl.blog.spark.hive |