前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结Hudi Spark SQL的使用,本人仍然以Hudi0.9.0版本为例,也会稍微提及最新版的一些改动。Hudi 从0.9.0版本开始支持Spark SQL,是由阿里的pengzhiwei同学贡献的,pengzhiwei目前已不负责Hudi,改由同事YannByron负责,现在又有ForwardXu贡献了很多功能特性,目前好像主要由ForwardXu负责。
三位都是大佬,都是Apache Hudi Committer,膜拜大佬,向大佬学习!!!大佬的github:
- 彭志伟(阿里) pengzhiwei https://github.com/pengzhiwei2018
- 毕岩(阿里) YannByron https://github.com/YannByron
- 徐前进(腾讯) ForwardXu https://github.com/XuQianJin-Stars
当然还有很多其他大佬,如Apache member/Hudi PMC Raymond Xu/许世彦 https://github.com/xushiyan,负责整个Spark模块
配置参数
核心参数:1
2--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
使用
三种方式使用Hudi Spark SQL
Spark Thrift Server
启动hudi-spark-thrift-server
1 | spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --name hudi-spark-thrift-server --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab --hiveconf hive.server2.thrift.http.port=20003 |
连接hudi-spark-thrift-server
1 | /usr/hdp/3.1.0.0-78/spark2/bin/beeline -u "jdbc:hive2://192.168.44.128:20003/default;principal=HTTP/indata-192-168-44-128.indata.com@INDATA.COM?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" |
spark-sql脚本
1 | spark-sql --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab |
Spark 程序
配置好参数后,直接使用spark.sql(sql)
即可
建表
1 | create table test_hudi_table ( |
- using hudi 表示我们要建的表是Hudi表
- primaryKey 主键,不设置的话,则表示该表没有主键,0.9.0版本以后必须设置
- preCombineField 预合并字段
- type 表类型
也支持其他hudi参数:hoodie开头的一些配置参数,表参数优先级较高,可以覆盖其他SQL默认参数,慎用,因为有些参数可能有bug,比如 hoodie.table.name
和hoodie.datasource.write.operation
,详情参考PR:https://github.com/apache/hudi/pull/5495
- location 指定了外部路径,那么表默认为外部表,如果不指定则使用数据库路径,为内部表
0.9.0版本以后 options
建议用 tblproperties
, options
可以继续使用
执行完建表语句,会在对应的表路径下初始化Hudi表,生成.hoodie元数据目录,并且会将Hudi表元数据信息同步到Hive表中,可以自行在Hive中验证内部表外部表的逻辑,Spark SQL目前不能验证,即使为外部表也不显示,不知道是否为bug
insert
1 | insert into test_hudi_table values (1,'hudi',10,100,'2021-05-05'),(2,'hudi',10,100,'2021-05-05') |
或1
2
3insert into test_hudi_table
select 1 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt union
select 2 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt
insert完查询验证一下,数据是否成功插入1
2
3
4
5
6
7
8select * from test_hudi_table
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 |
| 20220513110302 | 20220513110302_0_2 | id:1 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 1 | hudi | 10.0 | 100 | 2021-05-05 |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
另外备注一下:insert默认是会随机更新的,随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。
要想是insert操作不更新,可以使用以下配置:1
hoodie.merge.allow.duplicate.on.inserts = true
相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数
update
1 | update test_hudi_table set price = 20.0 where id = 1 |
price字段已经成功更新1
2
3
4
5
6+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 |
| 20220513143459 | 20220513143459_0_1 | id:1 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-57-3422_20220513143459.parquet | 1 | hudi | 20.0 | 100 | 2021-05-05 |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
delete
1 | delete from test_hudi_table where id = 1 |
id为1的记录被成功删除1
2
3
4
5+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
merge
HUDI 支持MERGE语句,有merge into 、merge update和merge delte。可以把增删改统一为:
1 | merge into test_hudi_table as t0 |
1 | +----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+ |
这样的好处有:
1、把所有的类型统一为一个SQL,避免产生过多的job
2、避免产生异常,对于insert,如果新insert的主键已经存在会产生异常
3、减少程序复杂度,对于update和delete,不用判断where条件和要修改哪些字段,再去拼接sql(从binlog里可能不能获取这些内容)
4、提升性能,对于批量DELETE, merge的性能要比使用in 好。
但是需要注意,当没有设置preCombineField = ‘ts’时,新来的数据会直接覆盖掉历史数据,这种情况存在于新数据的到达时间早于旧数据的到达时间的情况。
merge values
在merge的基础上对源码进行优化(代码目前没有提交到社区,想使用的话可以查看:https://gitee.com/dongkelun/hudi/commits/0.9.0),使Hudi SQL 支持 merge values形式,示例如下:
1 | merge into test_hudi_table as t0 |
对于想要SQL实现数据同步:这样修改的原因是merge是merge subQuery的形式,当拼接SQL很长时,如7000条记录,这样等于7000个select的语句,程序用递归的形式解析SQL很慢,仅解析subQuery的时间就要10分钟,这样不能满足我们的分钟级事务的需求。而通过修改源码支持 merge values的形式,通过values传值 ,这样解析时间从10分钟降低到几秒,后面在用程序将values转成表,直接upsert,大大提升了每个批次的事务时间。经过测试,千万级历史数据,千万级日增量,即平均每分钟7000条,可以实现分钟级事务的需求。
测评
记录用merge values语句测试的性能结果
Spark Thrift Server配置参数
1 | --executor-memory 4G --num-executors 15 --executor-cores 2 --driver-memory 6G --driver-cores 2 |
历史数据
本次测评数据采用TPC-DS的web_sales表,历史数据一千万,模拟日增量一千万,需要注意的是,源数据表的小数类型同样为double类型,不能是decimal,否则在后面增量数据同步时会有异常(Hudi Spark SQL 对于decimal类型有bug)
增量数据
程序读取增量数据拼接SQL,jdbc连接Spark Thrift Server实现增量同步,拼接SQL性能:一万条记录1秒之内完成
测评结果
Spark Server | Streaming | ||||
---|---|---|---|---|---|
批次 | 批次数据量 | 时间(s) | 批次 | 批次数据量 | 时间(s) |
1 | 6000 | 119 | 1 | 6898 | 59 |
2 | 6000 | 79 | 2 | 3211 | 27 |
3 | 6000 | 70 | 3 | 5999 | 36 |
4 | 6000 | 68 | 4 | 5999 | 33 |
5 | 5902 | 32 | |||
6 | 5995 | 35 |
Spark Server为Java程序通过JDBC连接Spark Thrift Server,在第一次没有缓存的情况下时间为120秒,在有缓存的情况下为70秒。
Streaming 为用Structured Streaming在每个批次中拼接Merge SQL,然后调用spark.sql()实现,从结果上看Streaming要比Spark Server快30秒,主要原因是Spark Server 的延迟调度时间比Streaming 的时间长,目前还没有找到解决方案使Spark Server的时间缩减到和Streaming的时间相当。
本次测评模拟的增量数据每分钟包含所有分区,没有起到分区过滤的效果。实际生产数据只包含部分少量分区,可以起到分区过滤的效果,增量同步的性能优于本次测评。
总结
本文主要总结了Hudi0.9.0版本Spark SQL常用的SQL命令的使用以及一些注意事项,其实还支持其他SQL语义,并且新版本支持的更多,大家可以自己学习测试。