Spark分区并行度决定机制
大家都知道Spark job中最小执行单位为task,合理设置Spark job每个stage的task数是决定性能好坏的重要因素之一,但是Spark自己确定最佳并行度的能力有限,这就要求我们在了解其中内在机制的前提下,去各种测试、计算等来最终确定最佳参数配比。
Spark任务在执行时会将RDD划分为不同的stage,一个stage中task的数量跟最后一个RDD的分区数量相同。之前已经介绍过,stage划分的关键是宽依赖,而宽依赖往往伴随着shuffle操作。对于一个stage接收另一个stage的输入,这种操作通常都会有一个参数numPartitions来显示指定分区数。最典型的就是一些ByKey算子,比如groupByKey(numPartitions: Int),但是这个分区数需要多次测试来确定合适的值。首先确定父RDD中的分区数(通过rdd.partitions().size()可以确定RDD的分区数),然后在此基础上增加分区数,多次调试直至在确定的资源任务能够平稳、安全的运行。
对于没有父RDD的RDD,比如通过加载HDFS上的数据生成的RDD,它的分区数由InputFormat切分机制决定。通常就是一个HDFS block块对应一个分区,对于不可切分文件则一个文件对应一个分区。
对于通过SparkContext的parallelize方法或者makeRDD生成的RDD分区数可以直接在方法中指定,如果未指定,则参考spark.default.parallelism的参数配置。下面是默认情况下确定defaultParallelism的源码:
1 | override def defaultParallelism(): Int = { |
通常,RDD的分区数与其所依赖的RDD的分区数相同,除非shuffle。但有几个特殊的算子:
1.coalesce和repartition算子
笔者先放两张关于该coalesce算子分别在RDD和DataSet中的源码图:(DataSet是Spark SQL中的分布式数据集,后边说到Spark时再细讲)
通过coalesce源码分析,无论是在RDD中还是DataSet,默认情况下coalesce不会产生shuffle,此时通过coalesce创建的RDD分区数小于等于父RDD的分区数。
笔者这里就不放repartition算子的源码了,分析起来也比较简单,图中我有所提示。但笔者建议,如下两种情况,请使用repartition算子:
1)增加分区数repartition触发shuffle,shuffle的情况下可以增加分区数。
coalesce默认不触发shuffle,即使调用该算子增加分区数,实际情况是分区数仍然是当前的分区数。
2)极端情况减少分区数,比如将分区数减少为1调整分区数为1,此时数据处理上游stage并行度降,很影响性能。此时repartition的优势即不改变原来stage的并行度就体现出来了,在大数据量下,更为明显。但需要注意,因为repartition会触发shuffle,而要衡量好shuffle产生的代价和因为用repartition增加并行度带来的效益。
2.union算子
还是直接看源码:
通过分析源码,RDD在调用union算子时,最终生成的RDD分区数分两种情况:
1)union的RDD分区器已定义并且它们的分区器相同
多个父RDD具有相同的分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的
2)不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和
4.cartesian算子
通过上述coalesce、repartition、union算子介绍和源码分析,很容易分析cartesian算子的源码。通过cartesian得到RDD分区数是其父RDD分区数的乘积。
spark.default.parallelism谈Spark谈并行度
上图是spark官网关于spark.default.parallelism参数说明:
- 对于reduceByKey和join这些分布式shuffle算子操作,取决于它的父RDD中分区数的最大值
- 对于没有父RDD的的算子,比如parallelize,依赖于集群管理器:
- 本地模式:取决于本地机器的核数
- 如果集群管理器是Mesos,则为8
- 其他的:对比所有executor上总核数与2比较,哪个大是哪个
- 本地模式:取决于本地机器的核数
当然上面这些都是默认值,如果我们自己设置了分区数,情况就会有所变化,直接看源码【查看org.apache.spark.Partitioner源码defaultPartitioner方法】
你会发现,如果你使用reducebykey、groupByKey等这些带shuffle的算子,建议不要通过上述方法让程序内部去推测。完全可以通过传入一个确定的分区数或者自己实现一个分区器来做处理。当然这个确定的分区数也不是贸贸然设定的,需要结合你的业务场景根据实际情况来确定多少合适。比如shuffle时流经的数据量,这个就要结合分区数和shuffle总数据量来做适当调整,处理不好的结果极有可能导致数据倾斜等问题…
在Spark SQL中,任务并行度参数则要参考spark.sql.shuffle.partitions,笔者这里先放一张图,详细的后面讲到Spark SQL时再细说:
看下图在Spark流式计算中,通常将SparkStreaming和Kafka整合,这里又分两种情况:
1.Receiver方式生成的微批RDD即BlockRDD,分区数就是block数
receiver模式的并行度由spark.streaming.blockInterval决定,默认是200ms。
receiver模式接收block.batch数据后会封装到RDD中,这里的block对应RDD中的partition。
batchInterval一定的情况下:
- 减少spark.streaming.Interval参数值,会增大DStream中的partition个数。
- 建议spark.streaming.Interval最低不能低于50ms。
2.Direct方式生成的微批RDD即kafkaRDD,分区数和kafka分区数一一对应
Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
DirectKafkaInputDStream定期生成的RDD的类型是KafkaRDD。
我们首先看看 KafkaRDD是如何划分分区的:
它会根据从初始化时接收的offset信息参数,生成KafkaRDDPartition分区;每个分区对应着Kafka的一个topic partition 的一段数据,这段数据的信息OffsetRange表示, 它保存了数据的位置。
通过源码分析可知:Partition的计算方法是为topic的每一个partition创建一个OffsetRange,所有的OffsetRange生成一个KafkaRDD。
如何增大RDD的分区数,让每个partition处理的数据量增大?
通过源码分析,可通过调小Kafka消息中Topic的分区数目;想要增加RDD的并行度,可通过调大Kafka消息中Topic的分区数目。