Spark 编程指南


1 概述

总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并行操作。

Spark最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD),RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作。RDD通常是通过,HDFS(或者其他Hadoop支持的文件系统)上的文件,或者驱动器中的Scala集合对象,来创建或转换得到;其次,用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行操作里复用之;最后,RDD具备容错性,可以从节点失败中自动恢复数据。

Spark第二个重要抽象概念是共享变量,共享变量是一种可以在并行操作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候,我们需要在任务之间,或者任务和驱动器之间共享一些变量。Spark提供了两种类型的共享变量:广播变量累加器,广播变量可以用来在各个节点上缓存数据,而累加器则是用来执行跨节点的“累加”操作,例如:计数和求和。

本文将会使用Spark所支持的所有语言来展示Spark的特性。如果你能启动Spark的交互式shell动手实验一下,效果会更好(对scala请使用bin/spark-shell,而对于python,请使用bin/pyspark)。

2 链接Spark

Spark 1.6.0 使用了Scala 2.10。用Scala写应用的话,你需要使用一个兼容的Scala版本(如:2.10.X)

同时,如果你需要在maven中依赖Spark,可以用如下maven工件标识:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0

另外,如果你需要访问特定版本的HDFS,那么你可能需要增加相应版本的hadoop-client依赖项,其maven工件标识如下:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,你需要如下,在你的代码里导入一些Spark class:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0之前,你需要显示的 import org.apache.spark.SparkContext._ 来启用这些重要的隐式转换)

3 初始化Spark

Spark应用程序需要做的第一件事就是创建一个 SparkContext 对象,SparkContext对象决定了Spark如何访问集群。而要新建一个SparkContext对象,你还得需要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。

每个JVM进程中,只能有一个活跃(active)的SparkContext对象。如果你非要再新建一个,那首先必须将之前那个活跃的SparkContext 对象stop()掉。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName参数值是你的应用展示在集群UI上的应用名称。master参数值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。实际上,一般不应该将master参数值硬编码到代码中,而是应该用spark-submit脚本的参数来设置。然而,如果是本地测试或单元测试中,你可以直接在代码里给master参数写死一个”local”值。

使用shell

在Spark shell中,默认已经为你新建了一个SparkContext对象,变量名为sc。所以spark-shell里不能自建SparkContext对象。你可以通过–master参数设置要连接到哪个集群,而且可以给–jars参数传一个逗号分隔的jar包列表,以便将这些jar包加到classpath中。你还可以通过–packages设置逗号分隔的maven工件列表,以便增加额外的依赖项。同样,还可以通过–repositories参数增加maven repository地址。下面是一个示例,在本地4个CPU core上运行的实例:

$ ./bin/spark-shell –master local[4]

或者,将code.jar添加到classpath下:

$ ./bin/spark-shell --master local[4] --jars code.jar

通过maven标识添加依赖:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

spark-shell –help可以查看完整的选项列表。实际上,spark-shell是在后台调用spark-submit来实现其功能的(spark-submit script.)