前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结一下Spark Checkpoint 用法及源码,包括RDD和Spark Streaming两种。
1、RDD
作用:持久化数据-将RDD的数据保存到可靠文件系统中(一般是HDFS),以便数据恢复
容错、复用:在Spark程序中,如果某个RDD因为业务逻辑比较复杂,导致Transformation很多,计算很耗时,且该RDD后面用到很多次,那么有可能因为各种原因导致数据丢失、程序失败,之后Spark会重新尝试执行程序(默认三次),就需要重新从头开始执行,而如果执行了Checkpoint,那么就会直接从HDFS文件读取相关数据信息,不需要从头开始执行,节省很多时间。
cache和persist:有了cache和persist,还需要Checkpoint么?需要,因为cache到内存,虽然最快,但是也最不可靠,即使persist到磁盘也不如HDFS文件系统可靠,且能cache到内存最好在内存,因为这样很快,然后和Checkpoint一起用,就能既保证了速度也保证了可靠性。1.1 用法
代码示例
1
2
3
4
5
6
7
8
9 //首先设置Checkpoint路径,这会在hdfs建立文件夹
sc.setCheckpointDir("hdfs://ambari.master.com:8020/spark/dkl/checkpoint/checkpointdemo")
//强烈建议先把rdd持久化到内存、否则保存到文件时会触发一个新的job重新计算
val rdd = sc.parallelize(1 to 1000, 10).cache
//调用checkpoint,标记该RDD要Checkpoint,将checkpointData设为ReliableRDDCheckpointData
//transformation操作,直到action操作才会执行,所以必须在一个action算子之前执行
rdd.checkpoint()
//直到action算子触发job,才会将rdd持久化到对应的hdfs文件中
println(rdd.sum)
1.2 源码
1.2.1 setCheckpointDir
建立对应的文件夹(fs.mkdirs(path))
1 | /** |
1.2.2 checkpoint
调用checkpoint,标记该RDD要Checkpoint,将checkpointData设为ReliableRDDCheckpointData,transformation操作,直到action操作才会执行,所以必须在一个action算子之前执行
1 | /** |
1.2.3 action算子
每个action算子都会调用sc.runJob方法,而runJob方法在最后会调用rdd.doCheckpoint()方法
1 | /** |
1.2.4 doCheckpoint
rdd.doCheckpoint()=>checkpointData.get.checkpoint()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 /**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
// TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
// them in parallel.
// Checkpoint parents first because our lineage will be truncated after we
// checkpoint ourselves
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}checkpointData.get.checkpoint()=>val newRDD = doCheckpoint()
cpState = CheckpointingInProgress,最开始cpState = Initialized(protected var cpState = Initialized)
1 | /** |
- 上面的checkpoint方法将checkpointData设为ReliableRDDCheckpointData,所以会调用ReliableRDDCheckpointData的doCheckpoint方法
在第一个action算子执行完之后,该方法会立即执行,将RDD的内容写到可靠的文件系统(ReliableCheckpointRDD.writeRDDToCheckpointDirectory)
doCheckpoint():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 /**
* Materialize this RDD and write its content to a reliable DFS.
* This is called immediately after the first action invoked on this RDD has completed.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
1.2.5 writeRDDToCheckpointDirectory
可以看到这里会触发一个job
其中writePartitionToCheckpointFile是将分区数据写入checkpoint文件,writePartitionerToCheckpointDir是将分区写入checkpoint文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 /**
* Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
*/
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
...
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
...
}
1.2.6 rdd.markCheckpointed()
在checkpointData.get.checkpoint()方法里可以看到最后会先将cpState = Checkpointed,然后执行rdd.markCheckpointed()
从检查点文件创建新的RDD,清除旧的依赖和分区
1
2
3
4
5
6
7
8
9 /**
* Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
* created from the checkpoint file, and forget its old dependencies and partitions.
*/
private[spark] def markCheckpointed(): Unit = {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
1.2.7 读 checkpoint
当执行action算子时,会触发job,通过sc.runJob方法,将函数(action算子)以参数的形式提交job,接着划分stage、提交stage、提交task、执行task,最后在org.apache.spark.scheduler.ResultTask.runTask方法里会调用rdd.iterator来获取rdd分区数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
看一下rdd.iterator1
2
3
4
5
6
7
8
9
10
11
12/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) { //当cache或persist时
getOrCompute(split, context)
} else {//没有cache和persist时
computeOrReadCheckpoint(split, context)
}
}
没有cache时调用computeOrReadCheckpoint1
2
3
4
5
6
7
8
9
10
11/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {//已经checkpoint的话
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}