Spark Streaming容错语义


本节中,我们将讨论Spark Streaming应用在出现失败时的具体行为。

1 背景

要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。

  • RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的血统信息,其中每个算子都以可容错的数据集作为输入数据。
  • 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的血统信息以及相应的原始输入数据集重新计算出来。
  • 假定所有RDD transformation算子计算过程都是确定性的,那么通过这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关。

Spark主要处理某些容错文件系统的数据,比如HDFS或S3。结果,所有由这些可容错数据源生成的RDD都可以容错。但是,SparkStreaming并非如此,因为大多数时候Streaming需要从网络远端接收数据,这回造成Streaming的数据源不可靠(特别是对于使用了fileStream的应用)。要实现RDD相同的容错属性,需要在多个不同worker节点上使用Spark执行器实现数据接收(默认复制因数为2)。因此一旦出现故障,系统需要恢复两种数据:

  • 接收并保存了副本的数据 – 数据不会因为单个worker节点故障而丢失,因为有副本!
  • 接收但尚未保存副本数据 – 因为数据并没有副本,所以一旦故障,只能从数据源重新获取。

此外,还有两种可能的故障类型需要考虑:

  • Worker节点故障 – 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。
  • Driver节点故障 – 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失。

有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。

2 定义

流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障):

  • 至多一次(At most once): 每条记录要么被处理一次,要么就没有处理。
  • 至少一次(At least once): 每条记录至少被处理过一次(一次或多次)。这种保证能确保没有数据丢失,比“至多一次”要强。但有可能出现数据重复。
  • 精确一次(Exactly once): 每条记录都精确地只被处理一次 – 也就是说,既没有数据丢失,也不会出现数据重复。这是三种保证中最强的一种。

3 基础语义

任何流式处理系统一般都会包含以下三个数据处理步骤:

  • 数据接收(Receiving the data): 从数据源拉取数据。
  • 数据转换(Transforming the data): 将接收到的数据进行转换(使用DStream和RDD transformation算子)。
  • 数据推送(Pushing out the data): 将转换后最终数据推送到外部文件系统,数据库或其他展示系统。

如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:

  • 数据接收: 不同数据源提供的保证不同,下一节再详细讨论。
  • 数据转换: 所有的数据都会被“精确一次”处理,这要归功于RDD提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。
  • 数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。

4 接收数据语义

不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。

1、从文件接收数据

如果所有输入数据来自可容错的文件系统,比如HDFS,那么SparkStreaming就可以恢复和处理任何失败的数据。这样就保证了语义的准确性,即在任何错误发生时,所有数据都只被精确地处理一次,不多也不少。

2、基于接收器接收数据

对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:

  • 可靠接收器 – 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据。
  • 不可靠接收器 – 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。

对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。

为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。

下表总结了故障情况下的各种语义:

部署场景

Worker 故障

Driver 故障

Spark 1.1及以前版本 或者

Spark 1.2及以后版本,且未开启WAL

若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据;

若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义

若使用不可靠接收器,则缓存数据和已保存数据都可能丢失;

若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证

Spark 1.2及以后版本,并启用WAL

若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证

若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证

 

 

 

3、从Kafka Direct API接收数据

从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。

5 输出算子的语义

输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式:

  • 幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。
  • 事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式:
    • 用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。

基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交
  }
}