前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结如何利用Hudi DeltaStreamer工具从外部数据源读取数据并写入新的Hudi表,HoodieDeltaStreamer
是hudi-utilities-bundle的一部分,按照Apache Hudi 入门学习总结,将hudi-spark-bundle包拷贝至$SPARK_HOME/jars目录下即可。
HoodieDeltaStreamer
提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
- 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件 增量导入
- 支持json、avro或自定义记录类型的传入数据
- 管理检查点,回滚和恢复
- 利用DFS或Confluent schema注册表的Avro模式。
- 支持自定义转换操作
- 除了上述官网说的几项,也支持读取Hive表等(历史数据)转化Hudi表,源码里还有其他的工具类,可以自行查阅源码发掘
命令行选项更详细地描述了这些功能:
1 | spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --help |
最新版本应该支持了更多参数,可以查阅官网:https://hudi.apache.org/cn/docs/hoodie_deltastreamer
Hive设置
ambari设置:hive.resultset.use.unique.column.names=false,并重启
SqlSource
这里利用SqlSource 读取Hive历史表转化为Hudi表,先讲SqlSource的原因是其他几个类型的Source都需要提供表Schema相关的配置,比较麻烦,如JdbcbasedSchemaProvider
需要配置jdbcUrl、user、password、table等或者FilebasedSchemaProvider
需要提供一个Schema文件的地址如/path/source.avsc
,无论是配置jdbc连接信息还是生成avsc文件都比较麻烦,所以想找一个不需要提供Schema的Source,通过搜索源码发现SqlSource可以满足这个需求,但是实际使用起来在0.9.0版本发现了bug,并不能直接使用,好在稍微修改一下对应的源码即可解决。当然还有其他不需要提供Schema的source,如ParquetDFSSource
和CsvDFSSource
,它们和SqlSource
都是RowSource
的子类,但是文件格式有限制,不如SqlSource
通用,SqlSource
只需要是Hive表即可,这也满足我们需要将Hive表转化为Hudi表的需求。
创建Hive历史表
1 | create database test location '/test'; |
Spark SQL创建Hudi目标表
1 | create database hudi location '/hudi'; |
这里事先用Spark SQL建表是因为虽然用HoodieDeltaStreamer
时配置同步hive参数也可以自动建表,但是某些参数不生效,如hoodie.datasource.hive_sync.create_managed_table
和hoodie.datasource.hive_sync.serde_properties
,在properties配置和通过 --hoodie-conf
配置都不行,通过阅读源码,发现0.9.0版本不支持(应该属于bug),这样不满足我们要建内部表和主键表的需求,所以这里先用Spark SQL建表,再用HoodieDeltaStreamer
转化数据。
最新版本已支持这些参数,PR:https://github.com/apache/hudi/pull/4175
配置文件
common.properties
1 | hoodie.datasource.write.hive_style_partitioning=true |
sql_source.properties
1 | include=common.properties |
命令
1 | spark-submit --conf "spark.sql.catalogImplementation=hive" \ |
enable-hive-sync和enable-sync都是开启同步Hive的,不过enable-hive-sync已弃用,建议用enable-sync
这里需要加参数spark.sql.catalogImplementation=hive
,因为源码里的Spark默认没有开启支持hive即enableHiveSupport
,而enableHiveSupport的实现就是通过配置spark.sql.catalogImplementation=hive
执行完后,查询目标表,可以发现数据已经从源表抽取到目标表了
如果不加checkpoint(SqlSource从设计上不支持checkpoint,所以原则上不应该使用checkpoint参数),否则会有日志:No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Optional.empty). New Checkpoint=(null) 。这样不会抽取数据
源码解读:
1 | if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { |
当checkpointStr和resumeCheckpointStr相同时,则认为没有新的数据,checkpointStr是source里面的checkpoint,resumeCheckpointStr使我们根据配置从target里获取的
1 | checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch() |
checkpointStr 在这里实际调用的是 SqlSource类里的 fetchNextBatch,而他的返回值写死为null return Pair.of(Option.of(source), null);
resumeCheckpointStr的逻辑为当目标表为空时,返回cfg.checkpoint
,具体的代码逻辑:
(这里贴的为master最新代码,因为0.9.0版本的逻辑不如新版的清晰,大致逻辑是一样的)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39/**
* Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from.
* @param commitTimelineOpt commit timeline of interest.
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
}
} else if (cfg.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
}
return resumeCheckpointStr;
}
所以我们加了--checkpoint earliest
,但是这样的话SqlSource默认的只能抽取一次,如果多次抽取或用HoodieDeltaStreamer
其他的增量抽取转化,则会抛异常:
1 | ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down |
这是因为虽然我们加了参数--checkpoint earliest
,但是代码里将checkpoint的值写死为null,在异常信息里可以看到从commit获取到的commit元数据信息:"deltastreamer.checkpoint.reset_key" : "earliest"
, "deltastreamer.checkpoint.key" : null
(最新版本如果为null,则不保存,即没有这个key),代码对应为类:org.apache.hudi.utilities.sources.SqlSource
1 | return Pair.of(Option.of(source), null); |
checkpoint为null就不能再次使用HoodieDeltaStreamer
增量写这个表了,要解决这个问题,只需要将代码改为:1
return Pair.of(Option.of(source), "0");
代码我已经提交到https://gitee.com/dongkelun/hudi/commits/0.9.0,该分支也包含对0.9.0版本的其他修改,除了这个异常还有可能因Hive版本不一致抛出没有方法setQueryTimeout的异常,解决方法我也提交到该分支了,可以自己查看。
最新版本(0.11)已经尝试修复这个问题,PR:https://github.com/apache/hudi/pull/3648,可以参考这个PR解决这个问题,基于这个PR,我们使用该PR新增的参数--allow-commit-on-no-checkpoint-change
,就会跳过No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Optional.empty). New Checkpoint=(null)
,它是这样解释的:
1 | allow commits even if checkpoint has not changed before and after fetch data from source. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode |
当从源获取数据时,即使checkpoint没有变化也允许commit,这对于像SqlSource这样没有checkpoint的很有用,但是不建议在continuous模式中使用,但是SqlSource不能使用--checkpoint
,否则依旧会报上面的异常,所以我提了一个PR:https://github.com/apache/hudi/pull/5633 尝试解决这个问题,不知道社区会不会接受
DFSSource
Distributed File System (DFS)
历史数据DFS JSON转化,支持多种数据格式
创建hive历史表,存储格式JSON
1 | create table test.test_source_json( |
插入数据
1 | insert into test.test_source_json values (1,'hudi', 10.0,100,'2021-05-05'); |
配置文件
dfs_source.properties
这里演示非分区表
1 | include=common.properties |
命令
1 | spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \ |
这里没有指定
--source-class
的原因是,它的默认值就是JsonDFSSource
用JdbcbasedSchemaProvider
获取Schema的原因是因为我对于生成avsc文件没有经验,两者选其一,所以选择了通过配置jdbc的形式获取Schema
当然这里在0.9.0版本如果需要内部表的话也需要和上面讲的一样事先用SparkSQL建表,0.11.0版本直接配置参数即可
默认的代码读取Hive表Schema是有异常的,异常如下
1 | Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed to get Schema through jdbc. |
第一个异常是:table does not exists!,当然根本原因并不是不存,需要修改源码,代码已经提交到代码我已经提交到https://gitee.com/dongkelun/hudi/commits/0.9.0,关于这个异常主要修改了两个地方,一个是因为Hive版本不一致抛出没有方法setQueryTimeout的异常,这里直接把调用setQueryTimeout方法的两个地方都删掉了,还有一个地方,是原来的tableExists
如果遇到异常,直接返回false,后面的逻辑如果返回false,直接抛出异常table does not exists!
,这样不能分析根本原因,因为还有其他原因造成的异常,比如kerberos权限问题,这里改成直接打印异常信息,方便分析原因,关于这一点我已经提交了PR:https://github.com/apache/hudi/pull/5827
原代码1
2
3
4
5
6
7
8
9
10private static Boolean tableExists(Connection conn, Map<String, String> options) {
JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) {
statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
statement.executeQuery();
} catch (SQLException e) {
return false;
}
return true;
}
修改后:
1 | private static Boolean tableExists(Connection conn, Map<String, String> options) { |
下面的这个异常,只需要修改Hive配置:hive.resultset.use.unique.column.names=false,关于这一点,我已经在Apache Hudi 入门学习总结提到过了
1 | Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed to get Schema through jdbc. |
KafkaSource
HoodieDeltaStreamer
支持两种Kafka格式的数据Avro和Json,分别对应AvroKafkaSource和JsonKafkaSource,这里为了方便造数,以JsonKafkaSource为例
Kafka配置文件
kafka_client_jaas.conf
1 | KafkaClient { |
producer.properties
这个配置是为了往kafka里造数
1 | security.protocol=SASL_PLAINTEXT |
造数
先kinit认证kerberos
1 | kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/indata-10-110-105-163.indata.com@INDATA.COM |
1 | /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-producer.sh --broker-list indata-10-110-105-163.indata.com:6667 --topic test_hudi_target_topic --producer.config=producer.properties |
1 | {"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2021-05-05"} |
消费
命令行消费topic验证数据是否成功写到对应的topic
1 | /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh --bootstrap-server indata-10-110-105-163.indata.com:6667 --from-beginning --topic test_hudi_target_topic --group dkl_hudi --consumer-property security.protocol=SASL_PLAINTEXT |
Hudi配置文件
kafka_source.properties
1 | include=common.properties |
kafkaSource和dfsSource一样也需要提供表Schema,由于这里读取kafka,而没有源表,这里从上面dfsSource建的表test.test_source_json
读取schema,从哪个表读取Schema都行,只要表结构一致即可
命令
1 | spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \ |
上面的都是一次性读取转化,kafka也可以连续模式读取增量数据,通过参数--continuous
,即:
1 | spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \ |
连续模式默认间隔0s即没有间隔连续性的读取checkpoint判断kafka(和offset对比)里是否有增量,可以通过参数--min-sync-interval-seconds
来修改间隔,比如 –min-sync-interval-seconds 60,设置60s读取一次
我们可以往kafka topic里再造几条josn数据,进行验证,是否可以正常读取增量数据
多表转化
博客参考: https://hudi.apache.org/blog/2020/08/22/ingest-multiple-tables-using-hudi/
以kafka json示例,首选创建两个用于获取schema的空表,test.test_source_json_1,test.test_source_json_2,然后创建两个kafka topic并往里造数test_hudi_target_topic_1,test_hudi_target_topic_2,最后通过HoodieMultiTableDeltaStreamer
往两个Hudi表test_hudi_target_kafka_1,test_hudi_target_kafka_2写数据
配置文件
kafka_source_multi_table.properties
1 | hoodie.deltastreamer.ingestion.tablesToBeIngested=hudi.test_hudi_target_kafka_1,hudi.test_hudi_target_kafka_2 |
config_table_1.properties
1 | include=common.properties |
config_table_2.properties
1 | include=common.properties |
1 | spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \ |
注意这里--target-table
是必填项,我们填其中一个表名即可,不过我认为不应该设置为必填,因为两个表名已经在配置文件中了还有0.9.0版本同步hive只有--enable-hive-sync
参数没有--enable-sync
,在最新版本里是有这个参数的,但是新版中--enable-hive-sync
并没有弃用,可见HoodieMultiTableDeltaStreamer
和HoodieDeltaStreamer
对于相同的参数并没有保持一致,可能用的人不多,贡献的也就不多。关于是否可以去掉--target-table
参数的问题我已经提交了PR:https://github.com/apache/hudi/pull/5883
这里我们并没有指定target-base-path,那么程序又是怎么知道表路径是什么呢,通过阅读源码发现,表路径为:1
String targetBasePath = basePathPrefix + "/" + database + "/" + tableName;
具体代码在方法resetTarget
,因为我们创建的数据库路径为/hudi
,所以这里--base-path-prefix
的值为/
执行上面的命令检查是否成功从每个topic里读取数据并写到对应的表中
HiveSchemaProvider
对应类:org.apache.hudi.utilities.schema.HiveSchemaProvider
上面介绍到用SqlSource的原因主要是可以不用提供为了获取Schema的jdbc Url等信息,但是SqlSource本身存在这一些问题,而其他的则要提供jdbc Url等信息配置起来麻烦,比如DFSSource KafkaSource等,而读取Kafka中的增量也不能用SqlSource,SqlSource只能用来转换一次增量数据,在0.9.0版本读取增量只能配置jdbc相关的参数来获取Schema,而且默认的代码读取Hive Schema还有bug或者不通用(因Hive版本不一致抛出没有方法setQueryTimeout的异常),只能自己该代码编译后才能使用,而0.11.0版本新增了HiveSchemaProvider,应该可以只指定库名表名就可以获取Schema信息了,具体如何使用等有时间我尝试一下再更新
HiveSchemaProvider
提供了四个参数:
1 | private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database"; |
分别为sourceSchema数据库名称、sourceSchema表名、targetSchema数据库名称、targetSchema表名,其中targetSchema对应的配置是可选的,当没有配置targetSchema,默认targetSchema等于sourceSchema,这个逻辑也同样适用于其他的SchemaProvider,比如上面示例中的JdbcbasedSchemaProvider
,只不过JdbcbasedSchemaProvider
并没有targetSchema的配置参数,只有sourceSchema的参数
配置参数
我们以上面的JsonDFSSource
为例
json_dfs_source.properties
这里演示非分区表
1 | include=common.properties |
命令
1 | spark-submit --conf "spark.sql.catalogImplementation=hive" \ |
我们先把jar升到0.11.0版本及以上,然后执行上面的命令,可能会报下面的异常,原因是因为HiveSchemaProvider
,在获取json格式的表时需要用到hive-hcatalog-core.jar
,我们去hive lib下面执行ls | grep hcatalog-core
,找到该jar包,然后将jar拷贝至spark jars目录下再执行,就可以成功读取表schema并将json数据转为Hudi目标表。
1 | 22/06/15 16:12:09 ERROR log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found |
通过示例中的配置可以看到利用HiveSchemaProvider
获取schema时的配置比较简单,方便使用,如果有targetSchema和sourceShcema不一致的需求,大家可以通过配置targetSchema的库名表名自己尝试。对于其他类型的Source,大家觉得HiveSchemaProvider
比较方便的话,也可以自行修改配置参数等。
总结
本文主要总结了Hudi DeltaStreamer的使用,以及遇到的各种问题,给出了解决方法,主要是使用该工具类读取历史表并转化为Hudi表以及读取增量数据写入Hudi表,当然也支持从关系型数据库读取表数据同步到Hudi表中,本文没有作出示例,由于问题较多,写的稍微乱一点,后面应该还会再写一篇整理一下,并且会从原理、源码层面进行总结,不过示例可能涉及会比较少。
更新
2022-09-02:hudi 0.12.0版本,同步hive时异常:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
442/09/02 17:40:33 ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down
org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.runMetaSync(DeltaSync.java:716)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:634)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/calcite/rel/type/RelDataTypeSystem
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory.get(SemanticAnalyzerFactory.java:318)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:484)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1317)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1457)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227)
at org.apache.hudi.hive.ddl.HiveQueryDDLExecutor.updateHiveSQLs(HiveQueryDDLExecutor.java:95)
at org.apache.hudi.hive.ddl.HiveQueryDDLExecutor.runSQL(HiveQueryDDLExecutor.java:86)
at org.apache.hudi.hive.ddl.QueryBasedDDLExecutor.createTable(QueryBasedDDLExecutor.java:92)
at org.apache.hudi.hive.HoodieHiveSyncClient.createTable(HoodieHiveSyncClient.java:152)
at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:279)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:219)
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:153)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:141)
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:56)
... 19 more
Caused by: java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
解决方法 hive/lib1
2
3
4ls | grep cal
calcite-core-1.16.0.3.1.0.0-78.jar
calcite-druid-1.16.0.3.1.0.0-78.jar
calcite-linq4j-1.16.0.3.1.0.0-78.jar
将calcite-core包拷贝到 spark/jars即可1
cp calcite-core-1.16.0.3.1.0.0-78.jar /usr/hdp/3.1.0.0-78/spark2/jars/