前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
最近在进行Spark任务调度的源码学习,最开始对CoarseGrainedExecutorBackend的启动流程不是很清楚,所以带着这个疑问继续深入学习,终于弄清楚了
CoarseGrainedExecutorBackend是如何启动的,并且对Spark任务调度源码的了解更深入了一点。本篇主要是带着这个疑问以standalone模式总结一下
CoarseGrainedExecutorBackend启动的主要的流程,并不对每一部分的源码进行深入详细的总结。
SparkContext 初始化
首先从SparkContext的初始化开始,一般我们写Spark代码也是首先创建SparkContext。其中会执行:1
2
3
4
5// Create and start the scheduler 这个master是在sparkSubmit.Main方法得到
// 返回(StandaloneSchedulerBackend,TaskSchedulerImpl)
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
createTaskScheduler 根据master模式匹配判断返回1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
// Ensure that default executor's resources satisfies one or more tasks requirement.
// This function is for cluster managers that don't set the executor cores config, for
// others its checked in ResourceProfile.
def checkResourcesPerTask(executorCores: Int): Unit = {
val taskCores = sc.conf.get(CPUS_PER_TASK)
if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) {
validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
}
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
}
master match {
case "local" =>
checkResourcesPerTask(1)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
......
// standalone模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
checkResourcesPerTask(coresPerWorker.toInt)
// Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
val memoryPerWorkerInt = memoryPerWorker.toInt
if (sc.executorMemory > memoryPerWorkerInt) {
throw new SparkException(
"Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
memoryPerWorkerInt, sc.executorMemory))
}
// For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
// to false because this mode is intended to be used for testing and in this case all the
// executors are running on the same host. So if host local reading was enabled here then
// testing of the remote fetching would be secondary as setting this config explicitly to
// false would be required in most of the unit test (despite the fact that remote fetching
// is much more frequent in production).
sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
其中TaskScheduler目前只有TaskSchedulerImpl这个实现类
然后会执行1
2
3// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
new StandaloneSchedulerBackend()
在执行val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls),会先在父类CoarseGrainedSchedulerBackend中执行1
2val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()
其中DriverEndpoint的onstart方法1
2
3
4
5
6
7
8
9override def onStart(): Unit = {
// Periodically revive offers to allow delay scheduling to work
// 调度程序为了运行任务而重新提供work资源的间隔长度。
val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
//每隔1秒,给自己发一个ReviveOffers,发给receive函数
reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
receive方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data, resources) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
val rpId = executorInfo.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorInfo.freeCores += taskCpus
resources.foreach { case (k, v) =>
executorInfo.resourcesInfo.get(k).foreach { r =>
r.release(v.addresses)
}
}
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
case ReviveOffers =>
makeOffers()
即不停的调用makeOffers1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// Make fake resource offers on all executors
// 在逻辑上,让所有Executor都成为计算资源的提供者
// makeOffers()是提交taskSet执行的关键方法,它会被DriverEndpoint每秒调用一次,
// 如果有任何runJob等产生task的动作就会被提交到各个节点去执行==》由于是初始化,
// 先记住这个方法是会不断被触发的,等运行runJob(),再跟踪进来
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// executorDataMap:HashMap[String, ExecutorData]保存executorId和ExecutorData的,
// executorDataMap的值是在,CoarseGrainedExecutorBackend这个RpcEndpoint初始化时在onStart方法,会给DriverEndpoint发送RegisterExecutor注入进去的
// Filter out executors under killing //筛选当前active状态的所有的executors
//CoarseGrainedExecutorBackend的初始化是由StandaloneSchedulerBackend.start方法,触发去执行的
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
// 每个active的executor都创建一个 WorkerOffer
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId)
}.toIndexedSeq
// 此方法由集群调用,在集群的slave准备资源。根据TaskSet的优先级,以轮询的方式发送到任务,以保证负载均衡
// TaskSchedulerImpl.resourceOffers生成资源分配的二维数组:Seq[ArrayBuffer[TaskDescription](o.cores)],
// 会被resourceOfferSingleTaskSet调用,
scheduler.resourceOffers(workOffers, true)
}
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}
TaskSchedulerImpl.start()
1 | override def start(): Unit = { |
backend.start()
1 | override def start(): Unit = { |
上面的start方法的关键点是:1
2
3
4
5
6
7
8
9
10val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
然后将command放在appdesc里
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit,
resourceReqsPerExecutor = executorResourceReqs)
//创建AppClient,传入相应启动参数,可以看出会将org.apache.spark.executor.CoarseGrainedExecutorBackend启动起来
//发送信息给Worker,使用Jdk的ProcessBuilder.start()来启动CoarseGrainedExecutorBackend
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
client.start()
1 | def start(): Unit = { |
ClientEndpoint的 onstart()
1 | override def onStart(): Unit = { |
registerWithMaster()
1 |
|
tryRegisterAllMasters()
1 |
|
这里的Master.ENDPOINT_NAME为1
val ENDPOINT_NAME = "Master"
Master
1 | private[deploy] class Master( |
可以看出来Master也是RpcEndpoint
Master.receive()
1 | override def receive: PartialFunction[Any, Unit] = { |
Master.schedule()
1 |
|
Master.startExecutorsOnWorkers()
1 |
|
Master.allocateWorkerResourceToExecutors()
1 |
|
launchExecutor()
1 | private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { |
Worker.receive()
1 | override def receive: PartialFunction[Any, Unit] = synchronized { |
manager.start() (ExecutorRunner)
1 | private[worker] def start(): Unit = { |
ExecutorRunner.fetchAndRunExecutor()
前面提到过appDesc.command包含类名org.apache.spark.executor.CoarseGrainedExecutorBackend,builder.start()会执行Linux命令,
启动CoarseGrainedExecutorBackend
1 | /** |
CoarseGrainedExecutorBackend.main()
最后启动CoarseGrainedExecutorBackend执行main
1 | def main(args: Array[String]): Unit = { |
1 | def parseArguments(args: Array[String], classNameForEntry: String): Arguments = { |
1 | def run( |
参考
https://blog.csdn.net/luyllyl/category_7506344.html