1 性能调优
要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情:
- 提高集群资源利用率,减少单批次处理耗时。
- 设置合适的批次大小,以便使数据处理速度能跟上数据接收速度。
2 减少批次处理时间
有不少优化手段都可以减少Spark对每个批次的处理时间。
1、数据接收并发度
跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。
如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了。
另一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操作会在处理前,将数据散开重新分发到集群中多个节点上。
2、数据处理并发度
在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(。
3、数据序列化
调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化:
- 输入数据: 默认地,接收器收到的数据是以 StorageLevel.MEMORY_AND_DISK_SER_2 的存储级别存储到执行器(executor)内存中的。也就是说,收到的数据会被序列化以减少GC开销,同时保存两个副本以容错。同时,数据会优先保存在内存里,当内存不足时才吐出到磁盘上。很明显,这个过程中会有数据序列化的开销 – 接收器首先将收到的数据反序列化,然后再以spark所配置指定的格式来序列化数据。
- Streaming算子所生产的持久化的RDDs: Streaming计算所生成的RDD可能会持久化到内存中。例如,基于窗口的算子会将数据持久化到内存,因为窗口数据可能会多次处理。所不同的是,spark core默认用 StorageLevel.MEMORY_ONLY 级别持久化RDD数据,而spark streaming默认使用StorageLevel.MEMORY_ONLY_SER 级别持久化接收到的数据,以便尽量减少GC开销。
不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪。
在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。
4、任务启动开销
如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销:
- 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。
- 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。
这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。
3 设置合适的批次间隔
要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。
根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。
要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用StreamingListener接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值)
4 内存调优
Spark的应用内存占用和GC优化已经在TuningGuide中进行了详细的讨论。让你看看那份文件吧,墙裂。在这个部分中,我们将简单地讨论一些专门为SparkStreaming设计的优化参数。
SparkStreaming应用程序在集群中占用的内存数量严重依赖于特定的传输操作。举例来说,如果要使用一个窗口操作前10分钟的数据,这样,集群至少需要10分钟的数据才能在内存中保存;另一个示例是updateStateByKey,若key较多,则相应地保存了许多key的state,这将消耗大量内存。而且,如果您的应用程序仅仅执行一个简单的“映射-筛选-存储”(map-filter-store)操作,那么它只需要少量的内存。
一般而言,streaming接收器接收到的数据以StorageLevel.MEMORY_AND_DISK_SER_2这一存储级别存入spark,也就是说,如果无法装入内存,数据将被吐到磁盘。发送到磁盘的数据将显著降低streaming应用的性能,所以根据应用处理的数据量,建议为streaming提供足够的内存。最好的办法是在小范围扩大存储器,观察评估结果,然后放大、评估。
另外一种内存优化方向是垃圾收集。由于streaming应用倾向于要求低延时,因此肯定不希望出现大量耗时或耗时的JVM垃圾收集暂停。
以下是一些能够帮助你减少内存占用和GC开销的参数或手段:
- DStream持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗。
- 清除老数据(Clearing old data): 缺省情况下,DStream的transformation操作符生成的所有输入数据都会自动清除。SparkStreaming将基于转换操作符清除旧数据。举例来说,你使用一个窗口操作来处理最近10分钟的数据,SparkStreaming至少保留10分钟的数据,而主动删除之前所有的数据。自然,您可以设置streamingContext.remember来保留较长的数据时间(例如:您可能需要交互查询较老的数据)。
- CMS垃圾回收器(CMS Garbage Collector): 为最大限度地缩短GC暂停的时间,我们推荐CMS垃圾回收器(concurrentmark-and-sweepGC)。尽管CMSGC会略微降低系统的整体吞吐量,但是我们还是推荐使用它,因为CMSGC可以将批量处理的时间保持在相对稳定的水平。最后,您需要确保在驱动器(通过spark-submit中的–driver-java-options设置)和执行程序(使用spark.executor.extraJavaOptions配置参数)设置。
- 其他提示: 如果还想进一步减少GC开销,以下是更进一步的可以尝试的手段:
- 配合Tachyon使用堆外内存来持久化RDD。
- 使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。