前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结 Flink 读写 HBase
版本
- Flink 1.15.4
- HBase 2.0.2
- Hudi 0.13.0
官方文档
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/hbase/
Jar包
SQL
需要提前建好hbase表;如果没有对应的hbase表,flink写hbase任务会显示finished,没有异常,但是并没有自动创建对应的hbase表
hbase shell创建Hbase表
1 | hbase shell |
Flink 写 Hbase
1 | CREATE TABLE flink_hbase_table( |
Flink 读 Hbase
1 | select * from flink_hbase_table; |
1
select id,name,price,ts,dt from flink_hbase_table;
hbase shell 验证数据
1 | hbase(main):002:0> scan 'flink_hbase_table' |
参数
完整的参数可以查看官网
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定使用的连接器, 支持的值如下 :hbase-1.4: 连接 HBase 1.4.x 集群hbase-2.2: 连接 HBase 2.2.x 集群(我的hbase版本为2.0.2) |
table-name | 必选 | (none) | String | 连接的 HBase 表名。默认该表在 “default” 命名空间下,指定命名空间下的表需要使用 “namespace:table”。 |
zookeeper.quorum | 必选 | (none) | String | HBase Zookeeper quorum 信息。 |
zookeeper.znode.parent | 可选 | /hbase | String | HBase 集群的 Zookeeper 根目录。 |
properties.* | 可选 | (无) | String | 可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。Flink 将移除 “properties.” 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如您可以设置 ‘properties.hbase.security.authentication’ = ‘kerberos’ 等kerberos认证参数。 |
Hudi包兼容性
前提:在开启了kerberos的环境上
当flink lib下面存在hudi(0.13.0版本)包时会出现flink连接不上hbase的现象,具体表现为:
1、Flink 查询 HBase 时,会抛异常:1
2
3
4
5
6
7ava.net.SocketTimeoutException: callTimeout=60000, callDuration=74175: Call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed row 'flink_hbase_table,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=indata-192-168-44-128.indata.com,16020,1695447819772, seqNum=-1
at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:159)
at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed
2、Flink 写 HBase 时,不报异常,但是会卡住,卡15分钟(正常执行时间20s)左右显示任务完成,实际上没有写成功。
原因
出现这种现象的原因是hudi包下存在hbase-site.xml,其中安全认证相关的配置和kerberos环境不一致。
获取Hbase配置的逻辑和优先级
为了验证上面原因的正确性,研究了一下获取Hbase配置的逻辑和优先级
优先级:
- 1、用户自定义参数 优先级最高 (SQL中配置的)
- 2、环境变量 优先级第二
环境变量一共有两个:HBASE_CONF_DIR
和HBASE_HOME
,其中HBASE_CONF_DIR
的优先级要高于HBASE_HOME
,
这两个环境变量下有两个配置文件 hbase-site.xml 和 hbase-default.xml 其中hbase-site.xml 优先级要高于 hbase-default.xml,也就是一共有四个优先级:
2.1HBASE_CONF_DIR
/conf/hbase-site.xml
2.2HBASE_CONF_DIR
/conf/hbase-default.xml
2.3HBASE_HOME
/conf/hbase-site.xml
2.4HBASE_HOME
/conf/hbase-default.xml 3、classpath 优先级最低,其中也有两个配置文件 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml
如hudi包中就存在 hbase-site.xml 和 hbase-default.xml
另外classpath可能有多个目录,多个目录之间也有优先级,对于我们环境有两个classpath存在hbase-site.xml,一个flink lib路径下面的hudi包,一个是/etc/hbase/conf。
为啥是/etc/hbase/conf,具体逻辑在flink bin/config.sh中:1
2
3
4
5
6
7# try and set HBASE_CONF_DIR to some common default if it's not set
if [ -z "$HBASE_CONF_DIR" ]; then
if [ -d "/etc/hbase/conf" ]; then
echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
HBASE_CONF_DIR="/etc/hbase/conf"
fi
fi因为我们环境默认没有配置HBASE_CONF_DIR,并且存在/etc/hbase/conf,所以就会走到这个逻辑,我们在启动sql-client时也会看到:
1
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set
那又是在哪里将这里的HBASE_CONF_DIR加到classpath中的呢?它是在sql-client.sh中通过java -classpath 参数添加的。
关于 java -classpath 可以参考:https://www.liaoxuefeng.com/wiki/1252599548343744/1260466914339296
CC_CLASSPATH优先级高于INTERNAL_HADOOP_CLASSPATHS ,flink lib属于CC_CLASSPATH,/etc/hbase/conf 属于INTERNAL_HADOOP_CLASSPATHS
/etc/hbase/conf 属于INTERNAL_HADOOP_CLASSPATHS:
flink lib属于CC_CLASSPATH:
因为上面这个函数(constructFlinkClassPath
)的逻辑我不太确定,我们可以在sql-clent.sh中打印: echo $CC_CLASSPATH
读取Hbase配置的逻辑可以查看源码,本文先不分析这块源码,可能会在下一篇文章补充分析部分源码,先只截个图:HBase2DynamicTableFactory
prepareRuntimeConfiguration
(HBaseSinkFunction
和 HBaseRowDataLookupFunction
中都有这个方法,分别是写和读)
解决方法
知道了Hbase配置的优先级,弄明白了hudi包中的hbase-site.xml为啥会影响flink读写hbase,也就知道如何解决这个问题,我们只需要根据优先级设置正确的hbase配置参数就好了。比如只有classpath中存在hbase配置那么我们就需要修改classpath中的hbase配置为正确配置。如果不想修改classpath中的配置文件或者觉得这样做不合适,我们可以设置更好优先级的配置,比如设置环境变量HBASE_CONF_DIR
和 HBASE_HOME
指向正确的hbase配置,另外我们也可以通过在sql参数中配置正确的参数,因为用户参数级别最高,这样配置优点是比较灵活,缺点是需要用户每次都多写一下额外的配置。
解决方法1
删除hudi包里面的hbase-site.xml(hudi-flink1.15-bundle-0.13.0.jar),这样就会去加载我们服务器环境上的正确的hbase-site.xml (/etc/hbase/conf)
删除hbase-site.xml不清楚对hudi有什么影响,目前没有发现~;另外理论上也应该删除hbase-default.xml,因为我们环境上有/etc/hbase/conf这个路径,所以会将/etc/hbase/conf加到classpath中,/etc/hbase/conf/hbase-site.xml的优先级高于同样在classpath中的hbase-default.xml,所以不删除hudi包中的hbase-default.xml也可以正常使用
解决方法2
修改hudi包里面的hbase-site.xml中的kerberos配置:
1 | # 经测试我们环境只需要这两个配置,可以根据自己的环境调整 |
这样保留了hbase-site.xml,避免删除hbase-site.xml造成的潜在影响
解决方法3
通过在建表语句中添加配置
1 | CREATE TABLE flink_hbase_table( |
我们在sql中也是添加了上面的两个配置,sql中的配置优先级最高,所以可以有效解决该问题。
另外我不清楚hbase.master.kerberos.principal在什么时候会用到,所以先备注在这里
还有一个问题就是我的方法2和方法3中的配置中都没有配置keytab: hbase.regionserver.keytab.file = /etc/security/keytabs/hbase.service.keytab,但是它可以正常读写,所以我目前还不清楚它是怎么知道读取哪个keytab文件的还是说连接hbase regionserver 时不需要这个配置项。
不启用kerberos的配置:1
2
3
4
5
6
7
8
9
10
11CREATE TABLE dkl(
id int,
cf ROW<name string,price double,ts bigint, dt string>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-unsecure',
'properties.hbase.security.authentication' = 'simple'
);
解决方法4
配置环境变量HBASE_CONF_DIR
和 HBASE_HOME
中的其中一个即可:1
2
3# 配置环境变量指向正确的hbase配置路径
export HBASE_CONF_DIR = /etc/hbase/conf
export HBASE_HOME = /etc/hbase/conf
因为环境变量优先级要比classpath高,所以就不会受hudi包中的hbase-site.xml影响了。(hudi包属于classpath)
解决方法5
通过修改源码,添加参数支持通过参数配置:1
'hbase.conf.dir'='/opt/dkl/hbase/conf'
有个疑问:就是既然有了方法3和方法4为啥还要改源码呢。有两个原因一个是因为有的flink客户端环境通过设置环境变量的形式不好实现,比如在pod里(对于做平台来说)。第二个原因是我开始并不清楚是哪几个参数影响的,方法3中通过配置用户自定义参数解决了这个问题,这两个参数是后来才知道的。而且对于不同的环境可能影响的参数也不一样,所以不如直接配置整个文件夹的形式简单有效。而且对于修改的这部分源码比较简单打包也很快,所以一开始尝试的这个方法,并且成功了。不过代码不是很完善(因为没有正式使用),这里先记录一下,后面再进行完善。
代码地址:https://github.com/dongkelun/flink/tree/1.15.4-hbase
打包:1
2
3
4
5
6
7## 修改代码,需要先检查代码格式
mvn spotless:apply -pl flink-connectors/flink-connector-hbase-2.2
## 先编译 hbase-base ,因为 hbase-2.2 依赖 hbase-base中的代码
mvn clean install -DskipTests -pl flink-connectors/flink-connector-hbase-base
mvn clean install -DskipTests -pl flink-connectors/flink-connector-hbase-2.2
## 最后将 flink-sql-connector-hbase-2.2 打包,最后打出来的包名为 flink-sql-connector-hbase-2.2-1.15.4.jar
mvn clean package -DskipTests -pl flink-connectors/flink-sql-connector-hbase-2.2
包地址:https://download.csdn.net/download/dkl12/88609805
总结
本文总结了Flink SQL 读写 HBase 的参数配置,解决了在kerberos环境下因 hudi 包 hbase-site.xml 配置冲突引起的异常,学习总结了 Flink SQL 读写 HBase 时加载 HBase 配置的优先级,但是没有详细的分析源码中的逻辑,可能会在后面的文章中补充相关的源码分析~