GeoTrellis

GeoTrellis

    ›Spark

    Getting Started

    • Getting Started
    • Reading Scala

    Vector

    • Vectors
    • Extents
    • Projection

    Raster

    • Rasters
    • Tiles
    • RasterSource
    • Rendering Images

    Layer

    • Layer Model

    Spark

    • Basics
    • Tile Layer RDD
    • Indexed Layers

    Spark Basics

    Apache Spark is a Scala library for writing distributed processing applications on the JVM. Spark is fundamentally built on map-reduce model of computation with added optional persistance intermediate stages. Spark provides a nice functional-looking API that allows us to treat distributed collections much like we would treat normal collections. However, it is important to develop a mental model of how spark operates in order to effectively write and debug spark applications.

    Spark Driver

    When you write and deploy a Spark program it runs on a single JVM termed the "Driver". One of the first things a Spark program will do is create a SparkContext which is both your interface to the cluster and job scheduler for task execution.

    import org.apache.spark.{SparkConf, SparkContext}
    
    implicit val sparkContext =
      SparkContext.getOrCreate(
        (new SparkConf(loadDefaults =  true))
          .setMaster("local[*]")
          .setAppName("Demo")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .set("spark.kryo.registrator", "geotrellis.spark.store.kryo.KryoRegistrator"))
     )
    

    RDD: Resilient Distributed Dataset

    Spark application represents computation as operations on RDD objects which are owned by the SparkContext. RDD is a lazy, partitioned, distributed collection. An RDD "contains" partitions of its data.

    Spark RDD

    Actions and Transformations

    The instantiation of the RDD partitions and their data happens on remote executors during task execution. RDD class instance on the driver captures only the data dependency and reference to work to be performed on that data.

    import scala.math.random
    import org.apache.spark.rdd.RDD
    
    val slices = 32
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    
    val countRdd: RDD[Int] = sparkContext.parallelize(1 until n, slices)
    
    val mapRdd: RDD[Int] = countRdd.map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y <= 1) 1 else 0
    }
    
    val count: Int = mapRdd.reduce { (a, b) => a + b}
    
    println(s"Pi is roughly ${4.0 * count / (slices - 1)}")
    

    Each RDD has a data dependency on its parents each RDD is represented as a Direct Acyclic Graph (DAG). There are two things you can do with such a DAG: continue building it or evaluate it.

    SparkPi DAG

    • Transformation adds a level to the DAG, producing another RDD
    • Action Evaluates the DAG and schedules it as Tasks on the cluster

    In this example call to countRdd.map is a transformation and call to mapRdd.reduce is an action.

    One tempting action is .collect. Collecting causes all of the executors to send their records to the driver where it can be "collected" into a single Array. Since the point of spark is to work with datasets larger than memory this action often results in Out of Memory Error on the driver. Caution should be taken when using it in production code.

    Forking DAGs

    It is tempting to think that each RDD object represents a specific state of the data as it traverses the DAG, thus they can be used like variables to to build on the computation. For instance if we wanted to validate the count of mapRdd in the next line like so:

    val samples = mapRdd.count()
    

    The DAG would look like:

    SparkPi Forked Dag

    One imagines that results of mapRdd will be reused for both actions, this is not true. The execution of the program results in two actions: reduce followed by count. Each one will result in DAG evaluation which will resolve all the dependencies and work in mapRdd will be done twice.

    Stages

    Spark executes the DAG in stages, of which there are two types: Map and Shuffle.

    Map Stage

    Spark Map Stage

    At the very lowest level each partition of RDD is evaluated as applying a function to an iterator of records: MapPartitionsRDD.compute

    When multiple map transformations follow each-other and all the information required to produce output partition is present in parent partition these transformations can be pipelined, applying the full function stack for each record.

    Shuffle Stage

    Spark Shuffle Stage

    When the data required for each output record may be spread across several partitions of the parent RDD a shuffle stage is required. This is the case with .groupByKey and .reduceByKey operations.

    During the shuffle stage each parent partition is grouped by target partition number, those chunks are written to disk and sent over the network to executors responsible for evaluation of corresponding partition in the next stage.

    Often during shuffle every record tries to go where its not. This turns the network into a bottleneck for the spark application. Additionally since the full dataset is always written to disk as part of a shuffle it will place additional strain on the storage.

    In reality shuffles are required to do useful work but their number should be minimized to benefit performance. Therefore it is helpful to be aware which types of operations will require a shuffle in your application.

    Caching

    This can be avoided by calling either mapRdd.cache or mapRdd.persist(StorageLevel.MEMORY_ONLY). The .persist method marks the RDD with a flag that will cause its partitions to be cached on the executors. Persisting an RDD will short-circuit its evaluation and re-use previous results, if available.

    Caching should not be used blindly. In many situation its faster to read data from external source and duplicate transformations. After all caching has its own overhead. Memory-only RDD persistance competes with memory required to perform the transformation and on-disk persistance involves a whole new storage subsystem in your job.

    Typically only stages used multiple times that have been results of several shuffles should be cached.

    ← Layer ModelTile Layer RDD →
    • RDD: Resilient Distributed Dataset
      • Actions and Transformations
      • Forking DAGs
    • Stages
      • Map Stage
      • Shuffle Stage
    • Caching
    GeoTrellis
    Community
    User ShowcaseGitter.im
    More
    BlogStar
    Copyright © 2020 Azavea