Estimates partition start indices for post-shuffle partitions based on mapOutputStatistics provided by all pre-shuffle stages.
Registers a ShuffleExchange operator to this coordinator.
Registers a ShuffleExchange operator to this coordinator. This method is only allowed to
be called in the doPrepare
method of a ShuffleExchange operator.
A coordinator used to determines how we shuffle data between stages generated by Spark SQL. Right now, the work of this coordinator is to determine the number of post-shuffle partitions for a stage that needs to fetch shuffle data from one or multiple stages.
A coordinator is constructed with three parameters,
numExchanges
,targetPostShuffleInputSize
, andminNumPostShufflePartitions
.numExchanges
is used to indicated that how many ShuffleExchanges that will be registered to this coordinator. So, when we start to do any actual work, we have a way to make sure that we have got expected number of ShuffleExchanges.targetPostShuffleInputSize
is the targeted size of a post-shuffle partition's input data size. With this parameter, we can estimate the number of post-shuffle partitions. This parameter is configured throughspark.sql.adaptive.shuffle.targetPostShuffleInputSize
.minNumPostShufflePartitions
is an optional parameter. If it is defined, this coordinator will try to make sure that there are at leastminNumPostShufflePartitions
post-shuffle partitions.The workflow of this coordinator is described as follows:
doPrepare
method.postShuffleRDD
to get its corresponding post-shuffle ShuffledRowRDD. If this coordinator has made the decision on how to shuffle data, this ShuffleExchange will immediately get its corresponding post-shuffle ShuffledRowRDD.postShuffleRDD
, this coordinator can lookup the corresponding RDD.The strategy used to determine the number of post-shuffle partitions is described as follows. To determine the number of post-shuffle partitions, we have a target input size for a post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages corresponding to the registered ShuffleExchanges, we will do a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until the size of a post-shuffle partition is equal or greater than the target size. For example, we have two stages with the following pre-shuffle partition size statistics: stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB] stage 2: [10 MB, 10 MB, 70 MB, 5 MB, 5 MB] assuming the target input size is 128 MB, we will have three post-shuffle partitions, which are: