Spark:checkpoint介绍

news/2024/11/29 13:36:43/

checkpoint,是Spark提供的一个比较高级的功能。
有时候我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且整个任务运行的时间也特别长,比如通常要运行1~2个小时。

在这种情况下,就比较适合使用checkpoint功能了。
因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。

所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用。

如何使用checkPoint

首先要调用SparkContextsetCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;
然后,对RDD调用checkpoint()方法。
最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之前设置的文件系统中。

这是checkpoint使用的基本步骤,很简单,那我们下面先从理论层面分析一下当我们设置好checkpoint之后,Spark底层都做了哪些事情

RDD之checkpoint流程

看这个图
在这里插入图片描述

  1. SparkContext设置checkpoint目录,用于存放checkpoint的数据;
    对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized
  2. 待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为CheckpointingInProgress
  3. 启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录
  4. 将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;
    并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;
    最后还会设置其父RDD为新创建的CheckpointRDD

checkpoint与持久化的区别

  • lineage是否发生改变
    linage(血缘关系)说的就是RDD之间的依赖关系
    持久化,只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的; Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了
  • 丢失数据的可能性
    持久化的数据丢失的可能性较大,如果采用 persist 把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。
    Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢?

因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。
如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。
那在这能不能使用基于内存的持久化呢? 当然是可以的,不过没那个必要

checkPoint的使用

下面来演示一下:将一个RDD的数据持久化到HDFS上面

Scala版本

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}object CheckpointOpScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CheckpointOpScala")//.setMaster("local")val sc = new SparkContext(conf)if(args.length==0){System.exit(100)}val outputPath = args(0)//1:设置checkpoint目录sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat").persist(StorageLevel.DISK_ONLY)//执行持久化//2:对rdd执行checkpoint操作dataRDD.checkpoint()dataRDD.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).saveAsTextFile(outputPath)sc.stop()}
}

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class CheckpointOpJava {public static void main(String[] args) {//创建JavaSparkContextSparkConf conf = new SparkConf();conf.setAppName("CheckpointOpJava").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);if(args.length==0){System.exit(100);}String outputPath = args[0];//1:设置checkpoint目录sc.setCheckpointDir("hdfs://bigdata01:9000/chk001");JavaRDD<String> dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat");//2:对rdd执行checkpoint操作dataRDD.checkpoint();dataRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) throws Exception {return i1 + i2;}}).saveAsTextFile(outputPath);sc.stop();}
}

执行测试代码,进到YARN的8088界面查看
点击Tracking UI进入spark的ui界面
看第一个界面jobs
在这里插入图片描述

在这可以看出来产生了2个job,
第一个job是我们正常的任务执行,执行了39s,一共产生了28个task任务
第二个job是checkpoint启动的job,执行了35s,一共产生了14个task任务

看第二个界面Stages,这里面的3个Stage是前面2个job产生的
在这里插入图片描述
其中,第二个Job,就是checkpoint启动的任务,查看它的stage的信息
在这里插入图片描述

这个job只会产生一个stage,因为我们只针对textFile的结果设置了checkpoint
在这里插入图片描述

这个stage执行消耗了35s,说明这份数据是重新通过textFile读取过来的。

针对Storage这块,显示的其实就是持久化的数据,如果对RDD做了持久化,那么在任务执行过程中能看到,任务执行结束就看不到了。
在这里插入图片描述


http://www.ppmy.cn/news/573092.html

相关文章

检查点(Checkpoint)的本质

1.检查点&#xff08;Checkpoint&#xff09;的本质 许多文档把Checkpint描述得非常复杂&#xff0c;为我们正确理解检查点带来了障碍&#xff0c;结果现在检查点变成了一个非常复杂的问题。实际上&#xff0c;检查点只是一个数据库事件&#xff0c;它存在的根本意义在于减少崩…

一文搞懂 checkpoint 全过程

前言 前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。 正文 跟 checkpoint 相关的起点在 buildGraph Deprecatedpublic static Execution…

1.checkpoint防火墙安装以及高可靠性配置

以色列厂商&#xff0c;主营安全产品&#xff0c;软件厂商&#xff0c;和其他做服务器的厂商合作&#xff0c;按照checkpoint要求生产设备&#xff0c;checkpoint是x86&#xff08;64bit&#xff09;架构&#xff0c;虚拟机上的网卡代表的是checkpont的接口&#xff0c;连续21年…

PostgreSQL检查点(checkpoint)详解

checkpoint简单点说就是一个数据库事件&#xff0c;用来保证数据一致性和完整性。 当我们在数据库中执行checkpoint时&#xff0c;就会将其之前的脏数据刷到磁盘&#xff0c;从而实现数据缩短数据库崩溃恢复时间的目的。 例如当我们更新任何数据的时候&#xff0c;对应于包含该…

Flink之checkpoint和savepoint的区别

1.什么是Checkpoint Checkpoint&#xff1a;一种由 Flink 自动执行的快照&#xff0c;其目的是能够从故障中恢复。Checkpoint 使 Flink 的状态具有良好的容错性&#xff0c;通过 checkpoint 机制&#xff0c;Flink 可以对作业的状态和计算位置进行恢复。 Checkpoint 对于用户层…

Flink Checkpoint 详解

Flink Checkpoint 详解 一、checkpoint简介二、checkpoint原理三、精确一次四、状态后端五、配置推荐 一、checkpoint简介 Checkpoint是Flink实现容错机制最核心的功能&#xff0c;是Flink可靠性的基石&#xff0c;它能够根据配置周期性地基于Stream中各个Operator的状态来生成…

PyTorch 之 Checkpoint 机制解析

点击上方“计算机视觉工坊”&#xff0c;选择“星标” 干货第一时间送达 作者丨Lart 编辑丨极市平台 导读 PyTorch 提供了一种非常方便的节省显存的方式&#xff0c;就是 Checkpoint 机制。这篇文章的目的在于更透彻的了解其内在的机制。 Checkpoint 机制 该技术的核心是一种使…

pytorch模型的保存和加载、checkpoint

pytorch模型的保存和加载、checkpoint 其实之前笔者写代码的时候用到模型的保存和加载&#xff0c;需要用的时候就去度娘搜一下大致代码&#xff0c;现在有时间就来整理下整个pytorch模型的保存和加载&#xff0c;开始学习~ pytorch的模型和参数是分开的&#xff0c;可以分别…