Estimates partition start indices for post-shuffle partitions based on mapOutputStatistics provided by all pre-shuffle stages.
Registers a ShuffleExchange operator to this coordinator.
Registers a ShuffleExchange operator to this coordinator. This method is only allowed to
be called in the doPrepare method of a ShuffleExchange operator.
A coordinator used to determines how we shuffle data between stages generated by Spark SQL. Right now, the work of this coordinator is to determine the number of post-shuffle partitions for a stage that needs to fetch shuffle data from one or multiple stages.
A coordinator is constructed with three parameters,
numExchanges,targetPostShuffleInputSize, andminNumPostShufflePartitions.numExchangesis used to indicated that how many ShuffleExchanges that will be registered to this coordinator. So, when we start to do any actual work, we have a way to make sure that we have got expected number of ShuffleExchanges.targetPostShuffleInputSizeis the targeted size of a post-shuffle partition's input data size. With this parameter, we can estimate the number of post-shuffle partitions. This parameter is configured throughspark.sql.adaptive.shuffle.targetPostShuffleInputSize.minNumPostShufflePartitionsis an optional parameter. If it is defined, this coordinator will try to make sure that there are at leastminNumPostShufflePartitionspost-shuffle partitions.The workflow of this coordinator is described as follows:
doPreparemethod.postShuffleRDDto get its corresponding post-shuffle ShuffledRowRDD. If this coordinator has made the decision on how to shuffle data, this ShuffleExchange will immediately get its corresponding post-shuffle ShuffledRowRDD.postShuffleRDD, this coordinator can lookup the corresponding RDD.The strategy used to determine the number of post-shuffle partitions is described as follows. To determine the number of post-shuffle partitions, we have a target input size for a post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages corresponding to the registered ShuffleExchanges, we will do a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until the size of a post-shuffle partition is equal or greater than the target size. For example, we have two stages with the following pre-shuffle partition size statistics: stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB] stage 2: [10 MB, 10 MB, 70 MB, 5 MB, 5 MB] assuming the target input size is 128 MB, we will have three post-shuffle partitions, which are: