前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
首先安装好Ceph,可以参考我前面的文章Ceph分布式集群安装配置
版本
Flink: 1.10.1
hadoop: hdp版本 3.1.1.3.1.0.0-78
jar包
flink-s3-fs-hadoop-1.10.1.jar,从maven仓库下载即可,下载地址:https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop,找到对应的版本下载即可
然后在$FLINK_HOME/plugins
目录下创建文件夹s3-fs-hadoop
,将下载的flink-s3-fs-hadoop-1.10.1.jar拷贝到该目录下
Flink Shell 读写
我这个版本的Flink是有scala-shell终端的,别的版本可能没有,路径 bin/start-scala-shell.sh
配置flink-conf.yaml
添加配置(Shell我们采用模式yarn-per-job):1
2
3
4
5execution.target: yarn-per-job
s3.access.key: access_key
s3.secret.key: secret_key
s3.endpoint: ip:7480
s3.connection.ssl.enabled: false
s3cmd创建测试文件
和上篇文章一样,先创建测试文件和测试Bucket
创建用于测试读的文件
创建Bucket1
2s3cmd mb s3://txt
Bucket 's3://txt/' created
本地生成测试txt1
2
3
4
5vi test.txt
1
2
3
4
将test.txt上传到s3://txt1
2
3s3cmd put test.txt s3://txt
upload: 'test.txt' -> 's3://txt/test.txt' [1 of 1]
8 of 8 100% in 0s 45.82 B/s done
创建用于测试写的Bucket
1 | s3cmd mb s3://test-s3-write |
启动 Flink-Scala-Shell
1 | bin/start-scala-shell.sh yarn |
测试代码
1 | // 测试读 |
s3cmd验证
1 | s3cmd ls s3://test-s3-write/ |
程序Jar包提交验证
本来想在IDEA本地远程读写S3验证,但是我没有找到本地扩展Flink plugins的方法,无奈先放弃,采用flink run 提交jar包的形式验证
pom依赖
1 | <dependency> |
代码
1 | package com.dkl.s3.flink; |
完整代码
完整代码已上传到GitHub,有需要的同学可自行下载:https://github.com/dongkelun/S3_Demo/tree/master/Flink_S3_Demo
提交
这里采用提交到standalone的模式,因为yarn-per-job的输出日志不太好查找,而虽然yarn-session也能看,但是由于我们开发环境用了Knox代理,而查看yarn-session日志时服务有异常,所以采用standalone模式,方便查看输出日志以便验证
这里需要首先启动standalone集群,并且去掉配置文件里的execution.target1
bin/start-cluster.sh
然后提交jar包运行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16bin/flink run -c com.dkl.s3.flink.Flink_S3_Demo ~/Flink_S3_Demo-1.0.jar
Job has been submitted with JobID d7cee212111d76ffe3d4ed5905a484ec
Program execution finished
Job with JobID d7cee212111d76ffe3d4ed5905a484ec has finished.
Job Runtime: 2637 ms
Job has been submitted with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2
Program execution finished
Job with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2 has finished.
Job Runtime: 1859 ms
Job has been submitted with JobID cfeecfa9226a3ae1d97728e80fa0dbff
Program execution finished
Job with JobID cfeecfa9226a3ae1d97728e80fa0dbff has finished.
Job Runtime: 277 ms
结果验证
Web UI地址(默认):ip:8081
1 | s3cmd ls s3://test-s3-write/ |
yarn-per-job 提交方式
flink-conf.yaml 里添加配置
1 | execution.target: yarn-per-job |
yarn-session
1 | ## 首先启动yarn-session |
异常解决
因为是第一次写Flink代码,会遇到一些初级问题,这里记录一下
异常1
1 | Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: indata-10-110-105-164.indata.com/10.110.105.164:45401 |
这里根据异常信息还是比较难定位的,原因是因为文件已经存在,我们需要指定模式为overwrite,信息太不明显了~
1 | import org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE |
异常2
1 | org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4) |
这个异常也是网上找了很多资料都没有解决,最后发现是因为我的环境上起了不止一个flink standalone,应该是因为有多个版本的flink
1 | jps |
首先把所有的Flink相关的进程kill,然后再启动standalone集群就可以了
其他问题
其他问题就是上面提到过的输出日志查看,yarn模式捣鼓了半天也没有找到输出日志(但是通过写成功的文件知道程序是运行成功的),最后发现是因为knox代理异常导致的,所以采用standalone解决,关于每个模式的提交方法我记录在上面了。
参考
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/filesystems/s3/