前言
总结一下Spark Checkpoint 用法及源码,包括RDD和Spark Streaming两种。
1、RDD
more >>作用:持久化数据-将RDD的数据保存到可靠文件系统中(一般是HDFS),以便数据恢复
容错、复用:在Spark程序中,如果某个RDD因为业务逻辑比较复杂,导致Transformation很多,计算很耗时,且该RDD后面用到很多次,那么有可能因为各种原因导致数据丢失、程序失败,之后Spark会重新尝试执行程序(默认三次),就需要重新从头开始执行,而如果执行了Checkpoint,那么就会直接从HDFS文件读取相关数据信息,不需要从头开始执行,节省很多时间。
cache和persist:有了cache和persist,还需要Checkpoint么?需要,因为cache到内存,虽然最快,但是也最不可靠,即使persist到磁盘也不如HDFS文件系统可靠,且能cache到内存最好在内存,因为这样很快,然后和Checkpoint一起用,就能既保证了速度也保证了可靠性。1.1 用法
代码示例
1
2
3
4
5
6
7
8
9 //首先设置Checkpoint路径,这会在hdfs建立文件夹
sc.setCheckpointDir("hdfs://ambari.master.com:8020/spark/dkl/checkpoint/checkpointdemo")
//强烈建议先把rdd持久化到内存、否则保存到文件时会触发一个新的job重新计算
val rdd = sc.parallelize(1 to 1000, 10).cache
//调用checkpoint,标记该RDD要Checkpoint,将checkpointData设为ReliableRDDCheckpointData
//transformation操作,直到action操作才会执行,所以必须在一个action算子之前执行
rdd.checkpoint()
//直到action算子触发job,才会将rdd持久化到对应的hdfs文件中
println(rdd.sum)