Spark 共享变量


1 概述

一般而言,当我们给Spark算子(如 map 或 reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,并且这些函数所引用的变量都是各个节点上的独立副本。这些变量都会以副本的形式复制到各个机器节点上,如果更新这些变量副本的话,这些更新并不会传回到驱动器(driver)程序。通常来说,支持跨任务的可读写共享变量是比较低效的。不过,Spark还是提供了两种比较通用的共享变量:广播变量和累加器。

2 广播变量

广播变量提供了一种只读的共享变量,它是把在每个机器节点上保存一个缓存,而不是每个任务保存一份副本。通常可以用来在每个节点上保存一个较大的输入数据集,这要比常规的变量副本更高效(一般的变量是每个任务一个副本,一个节点上可能有多个任务)。Spark还会尝试使用高效的广播算法来分发广播变量,以减少通信开销。

Spark的操作有时会有多个阶段(stage),不同阶段之间的分割线就是混洗操作。Spark会自动广播各个阶段用到的公共数据。这些方式广播的数据都是序列化过的,并且在运行各个任务前需要反序列化。这也意味着,显示地创建广播变量,只有在跨多个阶段(stage)的任务需要同样的数据 或者 缓存数据的序列化和反序列化格式很重要的情况下 才是必须的。

广播变量可以通过一个变量v来创建,只需调用 SparkContext.broadcast(v)即可。这个广播变量是对变量v的一个包装,要访问其值,可以调用广播变量的 value 方法。代码示例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

广播变量创建之后,集群中任何函数都不应该再使用原始变量v,这样才能保证v不会被多次复制到同一个节点上。另外,对象v在广播后不应该再被更新,这样才能保证所有节点上拿到同样的值(例如,更新后,广播变量又被同步到另一新节点,新节点有可能得到的值和其他节点不一样)。

3 累加器

累加器是一种只支持满足结合律的“累加”操作的变量,因此它可以很高效地支持并行计算。利用累加器可以实现计数(类似MapReduce中的计数器)或者求和。Spark原生支持了数字类型的累加器,开发者也可以自定义新的累加器。如果创建累加器的时候给了一个名字,那么这个名字会展示在Spark UI上,这对于了解程序运行处于哪个阶段非常有帮助(注意:Python尚不支持该功能)。

创捷累加器时需要赋一个初始值v,调用 SparkContext.accumulator(v) 可以创建一个累加器。后续集群中运行的任务可以使用 add 方法 或者 += 操作符 (仅Scala和Python支持)来进行累加操作。不过,任务本身并不能读取累加器的值,只有驱动器程序可以用 value 方法访问累加器的值。

以下代码展示了如何使用累加器对一个元素数组求和:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

以上代码使用了Spark内建支持的Int型累加器,开发者也可以通过子类化 AccumulatorParam 来自定义累加器。累加器接口(AccumulatorParam )主要有两个方法:1. zero:这个方法为累加器提供一个“零值”,2.addInPlace 将收到的两个参数值进行累加。例如,假设我们需要为Vector提供一个累加机制,那么可能的实现方式如下:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

如果使用Scala,Spark还支持几种更通用的接口:

1.Accumulable,这个接口可以支持所累加的数据类型与结果类型不同(如:构建一个收集元素的list);

2.SparkContext.accumulableCollection 方法可以支持常用的Scala集合类型。

对于在action算子中更新的累加器,Spark保证每个任务对累加器的更新只会被应用一次,例如,某些任务如果重启过,则不会再次更新累加器。而如果在transformation算子中更新累加器,那么用户需要注意,一旦某个任务因为失败被重新执行,那么其对累加器的更新可能会实施多次。

累加器并不会改变Spark懒惰求值的运算模型。如果在RDD算子中更新累加器,那么其值只会在RDD做action算子计算的时候被更新一次。因此,在transformation算子(如:map)中更新累加器,其值并不能保证一定被更新。以下代码片段说明了这一特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// 这里,accum任然是0,因为没有action算子,所以map也不会进行实际的计算

4 部署到集群

应用提交指南(application submission guide)中描述了如何向集群提交应用。换句话说,就是你需要把你的应用打包成 JAR文件(Java/Scala)或者一系列 .py 或 .zip 文件(Python),然后再用 bin/spark-submit 脚本将其提交给Spark所支持的集群管理器。

5 从Java/Scala中启动Spark作业

org.apache.spark.launcher 包提供了简明的Java API,可以将Spark作业作为子进程启动。

6 单元测试

Spark对所有常见的单元测试框架提供友好的支持。你只需要在测试中创建一个SparkContext对象,然后吧master URL设为local,运行测试操作,最后调用 SparkContext.stop() 来停止测试。注意,一定要在 finally 代码块或者单元测试框架的 tearDown方法里调用SparkContext.stop(),因为Spark不支持同一程序中有多个SparkContext对象同时运行。

7 从1.0之前版本迁移过来

Spark 1.0 冻结了Spark Core 1.x 系列的核心API,只要是没有标记为 “experimental” 或者 “developer API”的API,在未来的版本中会一直支持。对于Scala用户来说,唯一的变化就是分组相关的算子,如:groupByKey, cogroup, join,这些算子的返回类型由 (Key, Seq[Value]) 变为 (Key, Iterable[Value])。

8 下一步

你可以去Spark的官网上看看示例程序(example Spark programs)。另外,Spark代码目录下也自带了不少例子,见 examples 目录(Scala,JavaPythonR)。你可以把示例中的类名传给 bin/run-example 脚本来运行这些例子;例如:

./bin/run-example SparkPi

如果需要运行Python示例,则需要使用 spark-submit 脚本:

./bin/spark-submit examples/src/main/python/pi.py

对R语言,同样也需要使用 spark-submit:

./bin/spark-submit examples/src/main/r/dataframe.R

配置(configuration)和调优(tuning)指南提供了不少最佳实践的信息,可以帮助你优化程序,特别是这些信息可以帮助你确保数据以一种高效的格式保存在内存里。集群模式概览(cluster mode overview)这篇文章描述了分布式操作中相关的组件,以及Spark所支持的各种集群管理器。