Spark Streaming编程指南


1 概述

Spark Streaming是核心Spark API的扩展,可以实现流式处理实时数据流,并且具有良好的扩展性、高吞吐量和容错能力。Spark Streaming支持从多个数据源提取数据,比如Kafka,Flume,Twitter Zero MQ,Kinesis,以及TCP套接字,能够提供一些高级API来表示复杂的处理算法,例如:map,reduce,join和窗口,等等。最终,Spark Streaming支持将处理过的数据推入文件系统、数据库或实时仪表盘显示。事实上,你可以把Spark的机器学习(machine learning)和图计算(graph processing)的算法应用到Spark Streaming的数据流中。

下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。

Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDDs

本文档将向你展示如何用DStream进行Spark Streaming编程。Spark Streaming支持Scala、Java和Python(始于Spark 1.2),本文档的示例包括这三种语言。

注意:对Python来说,有一部分API尚不支持,或者是和Scala、Java不同。本文档中会用高亮形式来注明这部分 Python API。

2 举例

在深入Spark Streaming编程细节之前,我们先来看看一个简单的小栗子以便有个感性认识。假设我们在一个TCP端口上监听一个数据服务器的数据,并对收到的文本数据中的单词计数。以下你所需的全部工作:

首先,我们需要导入Spark Streaming的相关class的一些包,以及一些支持Streaming Context隐式转换的包(这些隐式转换能给DStream之类的class增加一些有用的方法)。Streaming Context 是Spark Streaming的入口。我们将会创建一个本地 Streaming Context对象,包含两个执行线程,并将批次间隔设为1秒。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // 从Spark 1.3之后这行就可以不需要了

// 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒
// master至少需要2个CPU核,以避免出现任务饿死的情况
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

利用这个上下文对象(StreamingContext),我们可以创建一个DStream,该DStream代表从前面的TCP数据源流入的数据流,同时TCP数据源是由主机名(如:hostnam)和端口(如:9999)来描述的。

// 创建一个连接到hostname:port的DStream,如:localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

这里的 lines 就是从数据server接收到的数据流。其中每一条记录都是一行文本。接下来,我们就需要把这些文本行按空格分割成单词。

// 将每一行分割成多个单词
val words = lines.flatMap(_.split(" "))

flatMap 是一种 “一到多”(one-to-many)的映射算子,它可以将源DStream中每一条记录映射成多条记录,从而产生一个新的DStream对象。在本例中,lines中的每一行都会被flatMap映射为多个单词,从而生成新的words DStream对象。然后,我们就能对这些单词进行计数了。

import org.apache.spark.streaming.StreamingContext._ // Spark 1.3之后不再需要这行
// 对每一批次中的单词进行计数
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// 将该DStream产生的RDD的头十个元素打印到控制台上
wordCounts.print()

words这个DStream对象经过map算子(一到一的映射)转换为一个包含(word, 1)键值对的DStream对象pairs,再对pairs使用reduce算子,得到每个批次中各个单词的出现频率。最后,wordCounts.print() 将会每秒(前面设定的批次间隔)打印一些单词计数到控制台上。

注意,执行以上代码后,Spark Streaming只是将计算逻辑设置好,此时并未真正的开始处理数据。要启动之前的处理逻辑,我们还需要如下调用:

ssc.start()             // 启动流式计算
ssc.awaitTermination()  // 等待直到计算终止

完整的代码可以在Spark Streaming的例子 NetworkWordCount 中找到。

如果你已经有一个Spark包,就可以执行按如下步骤运行这个例子。

首先,你需要运行netcat(Unix-like系统都会有这个小工具),将其作为data server

$ nc -lk 9999

然后,在另一个终端,按如下指令执行这个例子

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

好了,现在你尝试可以在运行netcat的终端里敲几个单词,你会发现这些单词以及相应的计数会出现在启动Spark Streaming例子的终端屏幕上。看上去应该和下面这个示意图类似:

# TERMINAL 1:

# Running Netcat

 

$ nc -lk 9999

 

hello world

 

 

 

 

# TERMINAL 2: RUNNING NetworkWordCount

 

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

——————————————-

Time: 1357008430000 ms

——————————————-

(hello,1)

(world,1)

下面,我们在之前的小栗子基础上,继续深入了解一下Spark Streaming的一些基本概念。

3 链接依赖项

和Spark类似,Spark Streaming也能在Maven库中找到。如果你需要编写Spark Streaming程序,你就需要将以下依赖加入到你的SBT或Maven工程依赖中。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.1</version>
</dependency>

还有,对于从Kafka、Flume以及Kinesis这类数据源提取数据的流式应用来说,还需要额外增加相应的依赖项,下表列出了各种数据源对应的额外依赖项:

数据源

Maven工件

Kafka

spark-streaming-kafka_2.10

Flume

spark-streaming-flume_2.10

Kinesis

spark-streaming-kinesis-asl_2.10 [Amazon Software License]

Twitter

spark-streaming-twitter_2.10

ZeroMQ

spark-streaming-zeromq_2.10

MQTT

spark-streaming-mqtt_2.10

 

 

4 初始化StreamingContext

要初始化任何一个Spark Streaming程序,都需要在入口代码中创建一个StreamingContext对象。

StreamingContext对象需要一个SparkConf对象作为其构造参数。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

上面代码中的 appName 是你给该应用起的名字,这个名字会展示在Spark集群的web UI上。而 master 是Spark, Mesos or YARN cluster URL,如果支持本地测试,你也可以用”local[*]”为其赋值。通常在实际工作中,你不应该将master参数硬编码到代码里,而是应用通过spark-submit的参数来传递master的值(launch the application with spark-submit )。不过对本地测试来说,”local[*]”足够了(该值传给master后,Spark Streaming将在本地进程中,启动n个线程运行,n与本地系统CPU core数相同)。注意,StreamingContext在内部会创建一个  SparkContext 对象(SparkContext是所有Spark应用的入口,在StreamingContext对象中可以这样访问:ssc.sparkContext)。

StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定。详见Spark性能调优( Performance Tuning)。

StreamingContext对象也可以通过已有的SparkContext对象来创建,示例如下:

import org.apache.spark.streaming._

val sc = ...                // 已有的SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

context对象创建后,你还需要如下步骤:

  • 创建DStream对象,并定义好输入数据源。
  • 基于数据源DStream定义好计算逻辑和输出。
  • 调用streamingContext.start() 启动接收并处理数据。
  • 调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
  • 你可以主动调用 streamingContext.stop() 来手动停止处理流程。

需要关注的重点:

  • 一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。
  • 一旦streamingContext被stop掉,就不能restart。
  • 单个JVM虚机同一时间只能包含一个active的StreamingContext。
  • StreamingContext.stop() 也会把关联的SparkContext对象stop掉,如果不想把SparkContext对象也stop掉,可以将StreamingContext.stop的可选参数 stopSparkContext 设为false。
  • 一个SparkContext对象可以和多个StreamingContext对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可。

5 离散数据流 (DStreams)

离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集。每个RDD都包含了特定时间间隔内的一批数据,如下图所示:

任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。其过程如下图所示:

底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。后续会详细讨论这些高级算子。

6 输入DStream和接收器

输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器相关联,而接收器则是专门从数据源拉取数据到内存中的对象。

Spark Streaming主要提供两种内建的流式数据源:

  • 基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
  • 高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。

本节中,我们将会从每种数据源中挑几个继续深入讨论。

注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象(详见后续的性能调优这一小节)。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。所以,在运行Spark Streaming应用的时候,需要注意分配足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。

要点

  • 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
  • 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。

7 基础数据源

前面的小栗子中,我们已经看到,使用ssc.socketTextStream(…) 可以从一个TCP连接中接收文本数据。而除了TCP套接字外,StreamingContext API 还支持从文件或者Akka actor中拉取数据。

文件数据流(File Streams): 可以从任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系统,创建方式如下:

  streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录)。注意:

    • 各个文件数据格式必须一致。
    • dataDirectory中的文件必须通过moving或者renaming来创建。
    • 一旦文件move进dataDirectory之后,就不能再改动。所以如果这个文件后续还有写入,这些新写入的数据不会被读取。

对于简单的文本文件,更简单的方式是调用 streamingContext.textFileStream(dataDirectory)。

另外,文件数据流不是基于接收器的,所以不需要为其单独分配一个CPU core。

  • Python API fileStream目前暂时不可用,Python目前只支持textFileStream。
  • 基于自定义Actor的数据流(Streams based on Custom Actors): DStream可以由Akka actor创建得到,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。
  • RDD队列数据流(Queue of RDDs as a Stream): 如果需要测试Spark Streaming应用,你可以创建一个基于一批RDD的DStream对象,只需调用 streamingContext.queueStream(queueOfRDDs)。RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据。

关于套接字、文件以及Akka actor数据流更详细信息。

8 高级数据源

Python API 自 Spark 1.6.1 起,Kafka、Kinesis、Flume和MQTT这些数据源将支持Python。

使用这类数据源需要依赖一些额外的代码库,有些依赖还挺复杂的(如:Kafka、Flume)。因此为了减少依赖项版本冲突问题,各个数据源DStream的相关功能被分割到不同的代码包中,只有用到的时候才需要链接打包进来。例如,如果你需要使用Twitter的tweets作为数据源,你需要以下步骤:

  • Linking: 将spark-streaming-twitter_2.10工件加入到SBT/Maven项目依赖中。
  • Programming: 导入TwitterUtils class,然后调用 TwitterUtils.createStream 创建一个DStream,具体代码见下放。
  • Deploying: 生成一个uber Jar包,并包含其所有依赖项(包括 spark-streaming-twitter_2.10及其自身的依赖树),再部署这个Jar包。
import org.apache.spark.streaming.twitter._

TwitterUtils.createStream(ssc, None)

注意,高级数据源在spark-shell中不可用,因此不能用spark-shell来测试基于高级数据源的应用。如果真有需要的话,你需要自行下载相应数据源的Maven工件及其依赖项,并将这些Jar包部署到spark-shell的classpath中。

下面列举了一些高级数据源:

  • Kafka: Spark Streaming 1.6.1 可兼容 Kafka 0.8.2.1。
  • Flume: Spark Streaming 1.6.1 可兼容 Flume 1.6.0 。
  • Kinesis: Spark Streaming 1.6.1 可兼容 Kinesis Client Library 1.2.1。
  • Twitter: Spark Streaming TwitterUtils 使用Twitter4j 通过 Twitter’s Streaming API 拉取公开tweets数据流。你可以获取所有的公开数据流,当然也可以基于某些关键词进行过滤。

9 自定义数据源

Python API 自定义数据源目前还不支持Python。

输入DStream也可以用自定义的方式创建。你需要做的只是实现一个自定义的接收器(receiver),以便从自定义的数据源接收数据,然后将数据推入Spark中。

10 接收器可靠性

从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:

  • 可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
  • 不可靠接收器(Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。

11 缓存/持久化

和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。

对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。

注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。

12 检查点

一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据:

  • 元数据检查点(Metadata checkpointing) – 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括:
    • Configuration – 创建Streaming应用程序的配置信息。
    • DStream operations – 定义流式处理逻辑的DStream操作信息。
    • Incomplete batches – 已经排队但未处理完的批次信息。
  • 数据检查点(Data checkpointing) – 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。

总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。

1、何时启用检查点

如果有以下情况出现,你就必须启用检查点了:

  • 使用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。
  • 支持驱动器故障中恢复(Recovering from failures of the driver running the application) – 这时候需要元数据检查点以便恢复流式处理的进度信息。

注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。

2、如何配置检查点

检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。另外,如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:

  • 如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()。
  • 如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象。

不过这个行为可以用StreamingContext.getOrCreate来实现,示例如下:

// 首次创建StreamingContext并定义好数据流处理逻辑
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // 新建一个StreamingContext对象
    val lines = ssc.socketTextStream(...) // 创建DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // 设置好检查点目录
    ssc
}

// 创建新的StreamingContext对象,或者从检查点构造一个
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 无论是否是首次启动都需要设置的工作在这里
context. ...

// 启动StreamingContext对象
context.start()
context.awaitTermination()

如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好DStream数据流。

除了使用getOrCreate之外,开发者还需要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。

另外需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。

13 部署应用

本节中将主要讨论一下如何部署Spark Streaming应用。

1、前提条件

要运行一个Spark Streaming 应用,你首先需要具备以下条件:

  • 集群以及集群管理器 – 这是一般Spark应用的基本要求。
  • 给Spark应用打个JAR包 – 你需要将你的应用打成一个JAR包。如果使用spark-submit 提交应用,那么你不需要提供Spark和Spark Streaming的相关JAR包。但是,如果你使用了高级数据源(advanced sources – 如:Kafka、Flume、Twitter等),那么你需要将这些高级数据源相关的JAR包及其依赖一起打包并部署。例如,如果你使用了TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。
  • 为执行器(executor)预留足够的内存 – 执行器必须配置预留好足够的内存,因为接受到的数据都得存在内存里。注意,如果某些窗口长度达到10分钟,那也就是说你的系统必须知道保留10分钟的数据在内存里。可见,到底预留多少内存是取决于你的应用处理逻辑的。
  • 配置检查点 – 如果你的流式应用需要检查点,那么你需要配置一个Hadoop API兼容的可容错存储目录作为检查点目录,流式应用的信息会写入这个目录,故障恢复时会用到这个目录下的数据。详见前面的检查点小节。
  • 配置驱动程序自动重启 – 流式应用自动恢复的前提就是,部署基础设施能够监控驱动器进程,并且能够在其故障时,自动重启之。不同的集群管理器有不同的工具来实现这一功能:
    • Spark独立部署 – Spark独立部署集群可以支持将Spark应用的驱动器提交到集群的某个worker节点上运行。同时,Spark的集群管理器可以对该驱动器进程进行监控,一旦驱动器退出且返回非0值,或者因worker节点原始失败,Spark集群管理器将自动重启这个驱动器。。
    • YARN – YARN支持和独立部署类似的重启机制。详细请参考YARN的文档。
    • Mesos – Mesos上需要用Marathon来实现这一功能。
  • 配置WAL(write ahead log)- 从Spark 1.2起,我们引入了write ahead log来提高容错性。如果启用这个功能,则所有接收到的数据都会以write ahead log形式写入配置好的检查点目录中。这样就能确保数据零丢失(容错语义有详细的讨论)。用户只需将 spark.streaming.receiver.writeAheadLog 设为true。不过,这同样可能会导致接收器的吞吐量下降。不过你可以启动多个接收器并行接收数据,从而提升整体的吞吐量(more receivers in parallel)。另外,建议在启用WAL后禁用掉接收数据多副本功能,因为WAL其实已经是存储在一个多副本存储系统中了。你只需要把存储级别设为 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系统)存储WAL,一定要记得启用这两个标识:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。
  • 设置好最大接收速率 – 如果集群可用资源不足以跟上接收数据的速度,那么可以在接收器设置一下最大接收速率,即:每秒接收记录的条数。相关的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 还需要设置 spark.streaming.kafka.maxRatePerPartition。从Spark 1.5起,我们引入了backpressure的概念来动态地根据集群处理速度,评估并调整该接收速率。用户只需将 spark.streaming.backpressure.enabled设为true即可启用该功能。
2、升级应用代码

升级Spark Streaming应用程序代码,可以使用以下两种方式:

  • 新的Streaming程序和老的并行跑一段时间,新程序完成初始化以后,再关闭老的。注意,这种方式适用于能同时发送数据到多个目标的数据源(即:数据源同时将数据发给新老两个Streaming应用程序)。
  • 老程序能够优雅地退出(参考  StreamingContext.stop(...) or JavaStreamingContext.stop(...) ),即:确保所收到的数据都已经处理完毕后再退出。然后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式需要数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),因为在新旧程序交接的这个空档时间,数据需要在数据源处缓存。目前还不能支持从检查点重启,因为检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种情况下,只能要么指定一个新的检查点目录,要么删除老的检查点目录。

14 应用监控

除了Spark自身的监控能力(monitoring capabilities)之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。

web UI上有两个度量特别重要:

  • 批次处理耗时(Processing Time) – 处理单个批次耗时
  • 批次调度延时(Scheduling Delay) -各批次在队列中等待时间(等待上一个批次处理完)

如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。

Spark Streaming程序的处理进度可以用StreamingListener接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。