@@ -453,8 +453,9 @@ abstract class RDD[T: ClassTag](
453453 val distributePartition = (index : Int , items : Iterator [T ]) => {
454454 var position = new Random (hashing.byteswap32(index)).nextInt(numPartitions)
455455 // TODO Enable insert a local sort before shuffle to make input data sequence
456- // deterministic, thus the config
457- // "spark.shuffle.recomputeAllPartitionsOnRepartitionFailure" can be disabled.
456+ // deterministic, to avoid retry on all partitions on FetchFailure. However, performing
457+ // a local sort before shuffle may increase the execution time of repartition()
458+ // significantly (For some large input data can cost 3x ~ 5x time).
458459 items.map { t =>
459460 // Note that the hash code of the key will just be the key itself. The HashPartitioner
460461 // will mod it with the number of total partitions.
@@ -464,11 +465,11 @@ abstract class RDD[T: ClassTag](
464465 } : Iterator [(Int , T )]
465466
466467 // include a shuffle step so that our upstream tasks are still distributed
467- val recomputeOnFailure =
468- conf.getBoolean(" spark.shuffle.recomputeAllPartitionsOnRepartitionFailure" , true )
469468 new CoalescedRDD (
470469 new ShuffledRDD [Int , T , T ](
471- mapPartitionsWithIndex(distributePartition, recomputeOnFailure),
470+ mapPartitionsWithIndexInternal(
471+ distributePartition,
472+ retryOnAllPartitionsOnFailure = true ),
472473 new HashPartitioner (numPartitions)),
473474 numPartitions,
474475 partitionCoalescer).values
@@ -815,14 +816,19 @@ abstract class RDD[T: ClassTag](
815816 * @param preservesPartitioning indicates whether the input function preserves the partitioner,
816817 * which should be `false` unless this is a pair RDD and the input function doesn't modify
817818 * the keys.
819+ *
820+ * @param retryOnAllPartitionsOnFailure indicates whether to recompute on all the partitions on
821+ * failure recovery, which should be `false` unless the output is repartitioned.
818822 */
819823 private [spark] def mapPartitionsWithIndexInternal [U : ClassTag ](
820824 f : (Int , Iterator [T ]) => Iterator [U ],
821- preservesPartitioning : Boolean = false ): RDD [U ] = withScope {
825+ preservesPartitioning : Boolean = false ,
826+ retryOnAllPartitionsOnFailure : Boolean = false ): RDD [U ] = withScope {
822827 new MapPartitionsRDD (
823828 this ,
824829 (context : TaskContext , index : Int , iter : Iterator [T ]) => f(index, iter),
825- preservesPartitioning)
830+ preservesPartitioning,
831+ retryOnAllPartitionsOnFailure)
826832 }
827833
828834 /**
@@ -843,21 +849,15 @@ abstract class RDD[T: ClassTag](
843849 *
844850 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
845851 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
846- *
847- * `recomputeOnFailure` indicates whether to recompute on all the partitions on failure recovery,
848- * which should be `false` unless the output is not sorted or not sortable, and the output is
849- * repartitioned.
850852 */
851853 def mapPartitionsWithIndex [U : ClassTag ](
852854 f : (Int , Iterator [T ]) => Iterator [U ],
853- preservesPartitioning : Boolean = false ,
854- recomputeOnFailure : Boolean = false ): RDD [U ] = withScope {
855+ preservesPartitioning : Boolean = false ): RDD [U ] = withScope {
855856 val cleanedF = sc.clean(f)
856857 new MapPartitionsRDD (
857858 this ,
858859 (context : TaskContext , index : Int , iter : Iterator [T ]) => cleanedF(index, iter),
859- preservesPartitioning,
860- recomputeOnFailure)
860+ preservesPartitioning)
861861 }
862862
863863 /**
@@ -1853,14 +1853,18 @@ abstract class RDD[T: ClassTag](
18531853 }
18541854
18551855 /**
1856- * Whether or not the RDD is required to recompute all partitions on failure . Repartition on an
1857- * RDD performs in a round-robin manner, thus there may be data correctness issue if only a
1858- * sub-set of partitions are recomputed on failure and the input data sequence is not
1856+ * Whether or not the RDD is required to recompute all partitions on FetchFailure . Repartition on
1857+ * an RDD performs in a round-robin manner, thus there may be data correctness issue if only a
1858+ * sub-set of partitions are recomputed on FetchFailure and the input data sequence is not
18591859 * deterministic. Please refer to SPARK-23207 and SPARK-23243 for related discussion.
18601860 *
1861- * Require to recompute all partitions on failure if repartition operation is called on this RDD
1862- * and the result sequence of this RDD is not deterministic (or the data type of the output of
1863- * this RDD is not sortable).
1861+ * Ideally we don't need to recompute all partitions on FetchFailure if the result sequence of an
1862+ * RDD is deterministic, but various sources (that out of control of Spark) may lead to
1863+ * non-determine result sequence(e.g. read from external data source / different spill and merge
1864+ * pattern under memory pressure), and we cannot bear the performance degradation by inserting a
1865+ * local sort before shuffle(can cost 3x ~ 5x time for repartition()), and the data type of an
1866+ * RDD may even be not sortable. Due to the above reason, we make a compromise to just require to
1867+ * recompute all partitions on FetchFailure if repartition operation is called on an RDD.
18641868 */
18651869 private [spark] def recomputeAllPartitionsOnFailure (): Boolean = false
18661870}
0 commit comments