前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun

前言
最初是想学习一下Spark提交流程的源码,比如 Spark On Yarn 、Standalone。之前只是通过网上总结的文章大概了解整体的提交流程,但是每个文章描述的又不太一样,弄不清楚到底哪个说的准确,比如Client 和 Cluster 模式的区别,Driver到底是干啥的,是如何定义的,为了彻底弄清楚这些疑问,所以决定学习一下相关的源码。因为不管是服务启动还是应用程序启动,都是通过脚本提交的,所以我们先从分析脚本开始。
版本
Spark 3.2.3
Spark 脚本
先看一下Spark 主要的脚本有哪些:spark-submit、spark-sql、spark-shell、spark-class、start-all.sh、stop-all.sh、start-master.sh、start-workers.sh 等。
spark-sql
1 | if [ -z "${SPARK_HOME}" ]; then |
通过 spark-submit 提交类 org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
spark-shell
1 | # Shell script for starting the Spark Shell REPL |
这里的主要逻辑也是用 spark-submit 提交类 org.apache.spark.repl.Main
spark-submit
根据上面的分析:spark-sql 和 spark-shell 两个交互式命令行脚本都是通过 spark-submit –class ClassName 来实现的。
1 | if [ -z "${SPARK_HOME}" ]; then |
逻辑比较清晰:通过 spark-class 提交 org.apache.spark.deploy.SparkSubmit
具体到 spark-sql 和 spark-shell 分别为:1
2/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell
start-all.sh
功能:启动 standalone 所有服务。相关配置可参考 Spark Standalone 集群配置
1 | # Start all spark daemons. |
主要逻辑:start-master.sh 启动 master、start-workers.sh 启动所有worker
start-master.sh
1 | # Starts the master on the machine this script is executed on. |
主要逻辑:由 spark-daemon.sh 启动类 org.apache.spark.deploy.master.Master
具体的参数:/opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080
start-workers.sh
1 | if [ -z "${SPARK_HOME}" ]; then |
配置host和端口,然后调用 workers.sh 参数是 cd "${SPARK_HOME}"
\; "${SPARK_HOME}
/sbin/start-worker.sh” “spark://$SPARK_MASTER_HOST
:$SPARK_MASTER_PORT
“
具体的参数:cd /opt/dkl/spark-3.2.3-bin-hadoop3.2 ; /opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/start-worker.sh spark://indata-10-110-8-199.indata.com:7077
workers.sh
1 | # Run a shell command on all worker hosts. |
主要逻辑:
- 先获取 HOSTLIST,优先级
$SPARK_WORKERS
、$SPARK_SLAVES
、${SPARK_CONF_DIR}
/workers、${SPARK_CONF_DIR}
/slaves,一般我们在 conf/workers (Spark3 默认) 或者 conf/slaves (Spark2 默认) 里配置 worker的 ip 或者hostname,如果没有配置,则默认 localhost - 获取 SPARK_SSH_OPTS ,默认 “-o StrictHostKeyChecking=no” ,如果有特殊需求,如端口号不是默认的 22,则可以在 spark-env.sh 中添加 export SPARK_SSH_OPTS=”-p 6233 -o StrictHostKeyChecking=no”
- 遍历 HOSTLIST , ssh 到每个host节点,执行上面 start-workers.sh 中的参数 cd “
${SPARK_HOME}
“ \; “${SPARK_HOME}
/sbin/start-worker.sh” “spark://$SPARK_MASTER_HOST
:$SPARK_MASTER_PORT
“。备注:$@:传递给脚本或函数的所有参数
start-worker.sh
1 | # Starts a worker on the machine this script is executed on. |
主要逻辑:
- 获取 MASTER ,这里为 spark://indata-10-110-8-199.indata.com:7077
- 判断 SPARK_WORKER_INSTANCES 是否为空,默认为空,也就是默认一个 Worker 实例
- 调用 start_instance 1 “
$@
“ ,主要逻辑计算每个示例的 PORT_NUM 和 WEBUI_PORT ,最后执行 “${SPARK_HOME}
/sbin”/spark-daemon.sh start$CLASS
$WORKER_NUM
–webui-port “$WEBUI_PORT
“$PORT_FLAG
$PORT_NUM
$MASTER
“$@
“
具体的参数:/opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077
spark-daemon.sh
根据上面的分析:master 和 worker 都是通过 spark-daemon.sh 来启动的。
1 | # Runs a Spark command as a daemon. |
主要逻辑:
- 判断有没有参数 –config ,如果有,则配置 SPARK_CONF_DIR 等于参数值,默认 ${SPARK_HOME}/conf
- 获取 option ,可选值 start|stop|submit|status ,start-master 和 start-worker 对应的都为 start
- 获取 command , 这里的值对应具体的 class , start-master 对应 org.apache.spark.deploy.master.Master ,start-worker 对应 org.apache.spark.deploy.worker.Worker
- 获取 instance , 这里的值均为 1 ,实例数,为了给后面的 log、pid用。
- 配置 log、pid 的文件路径和文件名 。log 默认路径 ${SPARK_HOME}/logs , pid 默认路径 /tmp
- 设置 SPARK_NICENESS :守护进程的调度优先级,如果 SPARK_NICENESS 为空,则设置默认值为0 。
- 匹配 option ,如果为 submit ,执行 run_command submit “
$@
“ ;如果为 start, 执行 run_command class “$@
“ ; …… ,这里只看 start - run_command 逻辑:
- 先获取 mode ,这里 mode 为 class ,
- 创建 pid 文件夹, 判断 pid 文件是否存在,如果存在,则获取pid值并判断是否存在对应的java进程,如果存在,则提示服务已经运行,先停止它
- 如果不存在,则匹配 mode 值,如果是class ,则执行 execute_command nice -n “
$SPARK_NICENESS
“ “${SPARK_HOME}
“/bin/spark-class “$command
“ “$@
“ - execute_command 是这里自定义的函数, nice -n “$SPARK_NICENESS” 是设置进程优先级,范围通常从 -20(最高优先级)到 +19(最低优先级)。默认的 nice 值是 0。可参考 https://www.cnblogs.com/yinguojin/p/18600924
- execute_command 逻辑 :
- -z 判断 ${SPARK_NO_DAEMONIZE+set} 是否为空,这里为空
- 执行 nohup – “
$@
“ >>$log
2>&1 < /dev/null & ,这里主要逻辑是执行$@
并将日志输出到对应的日志文件中,$@
: 所有脚本参数的内容, 这里为 : nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080 和 nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077
实际是通过 nice -n 0 设置进程优先级,然后通过 spark-class 启动对应的 Master 和 Worker 类 - 将上面返回的进程号赋给 newpid ,然后将 newpid 写到对应的 pid 文件中。($! :Shell最后运行的后台Process的PID)
- for 循环 1到10,每次判断 newpid 对应的java 进程是否启动成功,如果启动成功则终止循环,否则sleep 0.5 ,继续下次循环,也就是轮询最多5秒,以启动java进程
- 启动成功后,sleep 2秒,然后判断对应的java进程是否还存在,如果不存在,则提示启动失败,并打印对应的日志
spark-class
通过上面的分析可知:spark-sql、spark-shell、Master 和 Worker 的启动最终都是通过 spark-class 启动的,具体分别为:1
2
3
4/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell
nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.master.Master --host indata-10-110-8-199.indata.com --port 7077 --webui-port 8080
nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://indata-10-110-8-199.indata.com:7077
1 | if [ -z "${SPARK_HOME}" ]; then |
主要逻辑:
- 找Java环境变量,如果有,则拼接 ${JAVA_HOME}/bin/java
执行 build_command “
$@
“ ,并打印输出结果。对应命令:”$RUNNER
“ -Xmx128m$SPARK_LAUNCHER_OPTS
-cp “$LAUNCH_CLASSPATH
“ org.apache.spark.launcher.Main “$@
“ 具体为:- spark-sql 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
- spark-shell 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell
- start-master 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080
- start-worer 对应命令 : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077
- org.apache.spark.launcher.Main 主要逻辑 : 根据传入的参数,拼接命令并打印
1
2
3
4
5
6System.out.println('\0');
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
最后打印代码如上,先打印’\0’并换行,然后打印拼接的命令,每个命令后跟’\0’,不换行。最后打印的命令分别为:
spark-sql :
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriverspark-internal
一共两行,第一行是空字符串 \0’表示字符串结束符,第二行是具体拼接的命令,每一个命令后跟 ‘\0’ ,因为是空所以没有空格分隔开,第二行最后没有跟换行。后面的命令第一行一样,所以只记录第二行- spark-shell :
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Dscala.usejavacp=true-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.repl.Main–nameSpark shellspark-shell - start-master :
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.master.Master–hostindata-10-110-8-199.indata.com–port7077–webui-port8080 - start-worker :
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.worker.Worker–webui-port8081spark://indata-10-110-8-199.indata.com:7077 build_command 最后执行 printf “%d\0”
$?
,这和脚本含义是打印$?
后面再跟 \0 ,$?
是一个特殊的变量,用于获取上一个命令的退出状态码- 0:命令成功执行
- 0以外的数字:命令执行失败。
- 1:通用错误(General error), 发生了一个通用的错误,但没有提供具体的错误信息。
- 2:误用shell内置命令(Misuse of shell built-ins)
- 126:命令不可执行(Command invoked cannot execute)
- 127:未找到命令(Command not found)
所以build_command最终输出:以spark-sql为例:
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*
-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriverspark-internal0
\0 输出到日志文件中,以vi命令看,会显示^@,下面是在日志文件中vi查看的效果:^@
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java^@
-cp^@
/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*``^@
-Xmx1g^@
org.apache.spark.deploy.SparkSubmit^@
–class^@
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver^@
spark-internal^@
0^@
这样更能方便的理解 build_command 的输出是啥样的,方便后面的脚本分析,然后我们将 ^@ 换成空格:
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal 0- 最后解释一下 printf “%d\0” $? %d 代表数字,该行脚本的含义是将上一个命令的退出状态码打印并后跟一个\0,该命令没有换行,另外测试发现 %d 如果后面没有跟具体数字则默认值为0,可以通过 printf “\0” > test.log , 然后 vi test.log 查看 \0 在文件中会显示 ^@ ,但是cat test.log 则会显示空字符串。
- 后面 while IFS= read -d “
$DELIM
“ -r ARG; 是通过 read 命令读取 build_command “$@
“ 输出的结果,Bash read 命令可以参考 https://blog.csdn.net/lingeio/article/details/96587362 。 主要逻辑是读取第2步中输出的结果,解析对应的命令并放到 CMD 数组中,首先解析完第一行,然后将 CMD_START_FLAG 设置为 true开始拼接 CMD ,后以 ‘’ 为分隔符分割具体命令,放到CMD数组中。 - 组装好CMD数组后,先取CMD数组长度,获取CMD最后一个值作为 LAUNCHER_EXIT_CODE,即为在 build_command 中 执行 “
$RUNNER
“ -Xmx128m$SPARK_LAUNCHER_OPTS
-cp “$LAUNCH_CLASSPATH
“ org.apache.spark.launcher.Main “$@
“ 的命令退出状态码。 - 检查 LAUNCHER_EXIT_CODE 是否正常,如果不正常,则进行相应处理并退出,如果正常,则执行 org.apache.spark.launcher.Main 返回的命令 。
小结
通过本文上面的简单分析可知,无论是 spark-sql 、 spark-shell 这种交互式命令行,还是 Master 和 Worker 等Stanalone服务的启动,最终都是通过 spark-class 启动的。而 spark-class 的逻辑则是先通过 java -cp 执行类 org.apache.spark.launcher.Main,然后将拼接好的启动命令打印输出,最终在 spark-class 中解析输出的命令并执行,最终也都是通过 java -cp 执行具体的类的,分别如下:
- spark-sql : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal
- spark-shell : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Dscala.usejavacp=true -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell spark-shell
- start-master : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080
- start-worker : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/ -Xmx1g org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077
当然我们提交程序代码jar也是一样的,比如 spark-submit –master local –class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.12-3.2.3.jar , 对应到 spark-class 的提交命令为 :
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/ -Xmx1g org.apache.spark.deploy.SparkSubmit –master local –class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.12-3.2.3.jar
进一步总结发现,关于服务类的启动都是直接通过 java -cp 提交具体的类,其他的交互式命令行、jar 则是先通过 java -cp 提交 org.apache.spark.deploy.SparkSubmit ,最终具体执行的类则通过 –class 作为参数提交。那么下次我们先分析 org.apache.spark.deploy.SparkSubmit,看看最终真正的 class 是怎么提交运行的。