前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
学习和使用Hudi近一年了,由于之前忙于工作和学习,没时间总结,现在从头开始总结一下,先从入门开始
Hudi 概念
Apache Hudi 是一个支持插入、更新、删除的增量数据湖处理框架,有两种表类型:COW和MOR,可以自动合并小文件,Hudi自己管理元数据,元数据目录为.hoodie
,
具体的概念可以查看官网https://hudi.apache.org/cn/docs/0.9.0/overview
Hudi 学习
- Hudi 官网 https://hudi.apache.org/cn/docs/0.9.0/overview/(因本人最开始学习时Hudi的版本为0.9.0版本,所以这里列的也是0.9.0的连接)
- Hudi 官方公众号号:ApacheHudi (Hudi PMC leesf 运营的),自己搜索即可,这里不贴二维码了
- Github https://github.com/leesf/hudi-resources 这个是Hudi PMC leesf整理的公众号上的文章,PC 浏览器上看比较方便
- GitHub 源码 https://github.com/apache/hudi 想要深入学习,还是得看源码并多和社区交流
Hudi 安装
只需要将Hudi的jar包放到Spark和Hive对应的路径下,再修改几个配置
Spark
Hudi支持Spark程序读写Hudi表,同时也支持Spark SQL insert/update/delete/merge等
包名:hudi-spark-bundle_2.11-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.9.0/hudi-spark-bundle_2.11-0.9.0.jar
包名:hudi-utilities-bundle_2.11-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.9.0/hudi-utilities-bundle_2.11-0.9.0.jar
将hudi-spark-bundle_2.11-0.9.0.jar 和 hudi-utilities-bundle_2.11-0.9.0.jar拷贝到$SPARK_HOME/jars,当前版本目录为/usr/hdp/3.1.0.0-78/spark2/jars/
版本说明:0.9.0为hudi发行版本,2.11为HDP中Spark对应的scala版本
这里提供的是Maven的下载地址,对于其他版本,Maven上可以下载到,当然也可以自己打包
另外可能还需要json包,只需要将json包也放到$SPARK_HOME/jars目录下即可,如json-20200518.jar
,json包从一样从maven仓库下载即可,地址为https://repo1.maven.org/maven2/org/json/json/20200518/json-20200518.jar,不放的话可能会报缺少json包异常,如在同步hive元数据时
1 | Exception in thread "main" java.lang.NoClassDefFoundError: org/json/JSONException |
Hive
Hudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete
包名:hudi-hadoop-mr-bundle-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.9.0/hudi-hadoop-mr-bundle-0.9.0.jar
1、将hudi-hadoop-mr-bundle-0.9.0.jar 拷贝至$HIVE_HOME/lib,当前版本目录为:/usr/hdp/3.1.0.0-78/hive/lib/
2、修改hive配置(在hive-site.xml) hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat
hive.resultset.use.unique.column.names=false (修改这里的配置是因为如果我们用hudi-utilities-bundle中的工具类HoodieDeltaStreamer
,其中的JdbcbasedSchemaProvider
解析Hive表Schema时需要设置这个属性,否则解析异常,关于HoodieDeltaStreamer
的使用我会单独在另一篇文章中总结)
3、重启hive
Tez
1、将上述hudi-hadoop-mr-bundle-0.9.0.jar 打到/hdp/apps/${hdp.version}/tez/tez2.tar.gz中
注意:这里的路径是指HDFS路径
2、修改hive配置(在hive-site.xml) hive.tez.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat
3、重启Tez、Hive
关于第一个打包到tez2.tar.gz,我自己写了一个脚本,如下:
1 | jar=$1 |
这个脚本在我自己的环境上是可以正常运行使用的,当然可能因本人水平有限,写的还不够好,不能适用所有环境,可以自行修改,仅做参考
Flink
Hudi也支持Flink,本人目前还不会Flink~,可以参考官网https://hudi.apache.org/cn/docs/0.9.0/flink-quick-start-guide
Hudi 写入
Hudi支持Spark、Flink、Java等多种客户端,本人常用Spark、Java客户端,这俩相比较而言,大家用Spark较多,这里就以Spark代码进行简单的示例总结
Spark 配置参数
1 | import org.apache.hudi.DataSourceWriteOptions |
写Hudi并同步到Hive表
代码示例:
1 | val spark = SparkSession.builder(). |
代码说明:本地测试需要把同步Hive的代码部分注释掉,因为同步Hive需要连接Hive metaStore
服务器spark-shell里可以跑完整的代码,可以成功同步Hive,0.9.0版本同步Hive时会抛出一个关闭Hive的异常,这个可以忽略,这是该版本的一个bug,虽然有异常但是已同步成功,最新版本已经修复该bug,具体可以查看PR:https://github.com/apache/hudi/pull/3364
我已经将该PR合到0.9.0版本,如果想使用的话,可以查看:https://gitee.com/dongkelun/hudi/commits/0.9.0,该分支也包含其他基于0.9.0版本的bug修复和特性支持。
读Hudi
Spark 读取如上述代码示例:
1 | spark.read.format("hudi").load(tablePath1).show(false) |
结果:1
2
3
4
5+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |name|value|ts |dt |
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|20220512101542 |20220512101542_0_1 |id:1 |2022-05-12 |38c1ff87-8bc9-404c-8d2c-84f720e8133c-0_0-20-12004_20220512101542.parquet|1 |a1 |10 |1000|2022-05-12|
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
可以看到多了几个Hudi元数据字段其中_hoodie_record_key
为Hudi主键,如果设置了RECORDKEY_FIELD
,比如这里的ID,那么_hoodie_record_key
是根据我们设置字段生成的,默认不是复合主键,这里代码示例改为了复合主键,具体配置为1
option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getCanonicalName).
这里主要为了和SparkSQL保持一致,因为SparkSQL默认的为复合主键,如果不一致,那么upsert/delete时会有问题
默认情况RECORDKEY_FIELD
是必须设置的,RECORDKEY_FIELD
的默认值为uuid
,如果不设置,则会去找uuid,因为schema里没有uuid
,那么会报错
Hive
在服务器上运行示例代码是可以成功同步到Hive表的,我们看一下Hive表情况:
1 | show create table test_hudi_table_1; |
下面是Hive Hudi表的建表语句,和普通的Hive表的建表语句的区别可以自己比较,其中SERDEPROPERTIES里的内容是为了SparkSQL用的,可以看到这里包含了’primaryKey’=’id’,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的’primaryKey’获取的,如果没有这个属性,那么
Spark SQL认为该表不是主键表,则不能进行update等操作
1 | +----------------------------------------------------+ |
Hive查询Hudi表:
1 | select * from test_hudi_table_1; |
1 | +----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+ |
Hive是可以查询Hudi表的,但是不能update/delete,要想使用update/delete等语句,只能使用Spark SQL,另外Hive可以增量查询。关于如何使用Hudi Spark SQL和Hive的增量查询,这里不展开描述,以后会单独写
配置项说明
这里只说明几个比较重要的配置,其他相关的配置可以看官网和源码
RECORDKEY_FIELD
:默认情况RECORDKEY_FIELD
是必须设置的,RECORDKEY_FIELD
的默认值为uuid
,如果不设置,则会去找uuid,因为schema里没有uuid
,那么会报错。另外Hudi0.9.0支持非主键Hudi表,只需要配置option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
即可,但是在后面的版本已经不支持了KEYGENERATOR_CLASS_NAME
:默认值为SimpleKeyGenerator
,默认不支持复合主键,默认情况下上述_hoodie_record_key
的内容为1
,而不是id:1
,而SparkSQL的默认值为SqlKeyGenerator
,该类是ComplexKeyGenerator
的子类:1
class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
也就是本示例所使用的的复合主键类,当使用SimpleKeyGenerator
和ComplexKeyGenerator
同时upsert一个表时,那么会生成两条记录,因为_hoodie_record_key
的内容不一样,所以一张表的 KEYGENERATOR_CLASS_NAME
必须保证一致(父类和子类也是一致的)
PRECOMBINE_FIELD
: 预合并字段,默认值:ts,想详细了解预合并可以参考我的另外两篇博客https://dongkelun.com/2021/07/10/hudiPreCombinedField/和https://dongkelun.com/2021/11/30/hudiPreCombineField2/ upsert时,预合并是必须的,如果我们的表里没有预合并字段,或者不想使用预合并,不设置的话是会抛异常的,因为默认去找ts字段,找不到则跑异常,那么我们可以将预合并字段设置为主键字段PARTITIONPATH_FIELD
: Hudi的分区字段,默认值partitionpath
,对于没有分区的表,我们需要将该字段设置为空字符串option(PARTITIONPATH_FIELD.key, "")
,否则可能会因找不到默认值partitionpath
而抛异常。最新版本已经去掉分区字段默认值,详情可见:https://github.com/apache/hudi/pull/4195OPERATION
: Hudi的写操作类型,默认值为UPSERT_OPERATION_OPT_VAL
即upsert,Hudi支持多种操作类型 如:upsert、insert、bulk_insert、delete等,具体每个版本支持哪些操作类型,可以查看官网或源码,可以根据自己的需求选择选择操作类型。本代码展示了upsert成功后,又删除成功。
下面的参数和同步hive元数据,查询hive有关
META_SYNC_ENABLED
: 默认为false,不同步Hive,要想同步Hive可以将该值设为true,另外也可以设置HIVE_SYNC_ENABLED
进行同步Hive,作用差不多,至于区别,这里不详细解说HIVE_USE_JDBC
: 是否使用jdbc同步hive,默认为true,如果使用jdbc,那么需要设置HIVE_URL
、HIVE_USER
、HIVE_PASS
等配置,因为url和ip有关,每个环境不一样,用起来比较麻烦,所以这里不采用,另外因为实际使用是和Hive绑定的,可以直接使用HMS进行同步,使用起来比较方便,改为false后默认使用HMS同步Hive,具体逻辑可以看Hudi Hive 同步模块的源码,这里不展开HIVE_STYLE_PARTITIONING
: 是否使用Hive格式的分区路径,默认为false,如果设置为true,那么分区路径格式为= ,在这里为dt=2022-05-12,默认情况下只有 即2022-05-12,因为我们常用Hive表查询Hudi所以,这里设置为true HIVE_CREATE_MANAGED_TABLE
: 同步Hive建表时是否为内部表,默认为false,使用saveAsTable(实际调用的Hudi Spark SQL CTAS)建表时0.9.0版本有,本应该为内部表,但还是为外部表,可以通过设置这个参数修正,最新版本已修复,详情可见PR:https://github.com/apache/hudi/pull/3146HIVE_TABLE_SERDE_PROPERTIES
: 同步到Hive表SERDEPROPERTIES,为了Hudi Spark SQL 使用,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的’primaryKey’获取的,如果没有这个属性,那么Spark SQL认为该表不是主键表,则不能进行update等操作,而默认情况同步Hive时没有将主键字段同步过去,最新版本已经不需要设置该属性了。相关PR:https://github.com/apache/hudi/pull/3745 这个PR添加了支持HIVE_CREATE_MANAGED_TABLE
配置,但是CTAS依旧有bug,代码里的虽然判断表类型是否为内部表,并添加到options中,但是最后并没有将options用到最终写Hudi的参数中。另一个PR:https://github.com/apache/hudi/pull/3998 该PR的主要目的不是为了解决这个bug,但是附带解决了这个问题,因为options最终被正确传到写Hudi的参数中了
其他Hive相关的配置参数不一一解释,可自行查看官网
hoodie.properties
.hoodie
目录下有表属性文件.hoodie.properties
,内容为:1
2
3
4
5
6
7
8
9
10
11hoodie.table.precombine.field=ts
hoodie.table.partition.fields=dt
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=2
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_hudi_table_1
新版本在该属性文件里增加了很多属性,如HIVE_STYLE_PARTITIONING
即hoodie.datasource.write.hive_style_partitioning
,增加属性便于使表的属性前后保持统一
非主键表
如上面配置项说明所示Hudi0.9.0版本支持非主键表,对于纯insert的表有用,这里进行简单的代码示例
1 | val tableName2 = "test_hudi_table_2" |
结果:
1 | +-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+ |
可以看到Hudi的主键为uuid,_hoodie_partition_path
为空,即非主键非分区表
备注:insert默认是会随机更新的(如果是主键表,大家可以将程序改为主键表,自行测试),随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。
要想是insert操作不更新,可以使用以下配置:
1 hoodie.merge.allow.duplicate.on.inserts = true
相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数
saveAsTable
利用saveAsTable写Hudi并同步Hive,实际最终调用的是Spark SQL CTAS(CreateHoodieTableAsSelectCommand)
CTAS 先用的insert into(InsertIntoHoodieTableCommand),再建表,默认insert,这里展示怎么配置参数使用bulk_insert,并且不使用预合并,这对于转化没有重复数据的历史表时很有用。
insert into SQL 默认是insert,配置一些参数就可以使用upsert/bulk_insert,具体可以看InsertIntoHoodieTableCommand
源码
1 | val tableName3 = "test_hudi_table_3" |
这段代码本地是可以直接跑通的,结果为:
1 | +-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+ |
本地测试并没有同步到Hive表,因为并没有开启enableHiveSupport()
(本地验证时,注释掉这个配置),当在服务器上运行时,则可以成功同步到Hive表,可以自己试试,用saveAsTable的好处是,很多配置比如同步Hive都在Hudi Spark SQL的源码里配置了,所以配置较少。CTAS也有一些限制,比如不能覆盖写,不如save(path)
灵活
代码
完整代码地址:https://gitee.com/dongkelun/spark-hudi/blob/master/src/main/scala/com/dkl/blog/hudi/SparkHudiDemo.scala
备注:以后可能因重构地址有所变动
总结
本文对Hudi安装、读写进行了简单的总结,因为精力原因写的可能没有很全面,希望对刚入门Hudi的同学有所帮助,后面会继续总结Hudi Spark SQL 等其他方面的知识。