Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,12 @@ class PartitionerAwareUnionRDDPartition(
}
}

/**
* Class representing an RDD that can take multiple RDDs partitioned by the same partitioner and
* unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each
* will be unified to a single RDD with p partitions and the same partitioner. The preferred
* location for each partition of the unified RDD will be the most common preferred location
* of the corresponding partitions of the parent RDDs. For example, location of partition 0
* of the unified RDD will be where most of partition 0 of the parent RDDs are located.
*/
private[spark]
class PartitionerAwareUnionRDD[T: ClassTag](
abstract class PartitionerAwareUnionRDDBase[T: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[T]]
) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
require(rdds.nonEmpty)
require(rdds.forall(_.partitioner.isDefined))
require(rdds.flatMap(_.partitioner).toSet.size == 1,
"Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
require(rdds.nonEmpty, "RDDs cannot be empty")

override val partitioner = rdds.head.partitioner

Expand Down Expand Up @@ -111,3 +100,49 @@ class PartitionerAwareUnionRDD[T: ClassTag](
rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
}
}

/**
* Class representing an RDD that can take multiple RDDs partitioned by the same partitioner and
* unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each
* will be unified to a single RDD with p partitions and the same partitioner. The preferred
* location for each partition of the unified RDD will be the most common preferred location
* of the corresponding partitions of the parent RDDs. For example, location of partition 0
* of the unified RDD will be where most of partition 0 of the parent RDDs are located.
*/
private[spark]
class PartitionerAwareUnionRDD[T: ClassTag](
sc: SparkContext,
var _rdds: Seq[RDD[T]]
) extends PartitionerAwareUnionRDDBase(sc, _rdds) {
require(_rdds.forall(_.partitioner.isDefined))
require(_rdds.flatMap(_.partitioner).toSet.size == 1,
"Parent RDDs have different partitioners: " + _rdds.flatMap(_.partitioner))
}

/**
* This is similar to [[PartitionerAwareUnionRDD]], but it doesn't require the parent RDDs
* to have defined partitioner and have the same partitioner if defined.
* It is because SQL's shuffle RDD's partitioner is not defined in `ShuffledRowRDD`.
* The actual partitioning is implemented in `ShuffleExchangeExec.prepareShuffleDependency`.
*
* Thus, this RDD doesn't check the partitioner of parent RDDs. Its correctness relies on the
* fact that the given RDDs are partitioned in the same way. So before using this RDD, you must
* ensure that all parent RDDs are partitioned correctly by checking their SQL output partitioning.
*/
private[spark]
class SQLPartitioningAwareUnionRDD[T: ClassTag](
sc: SparkContext,
var _rdds: Seq[RDD[T]],
val numPartitions: Int
) extends PartitionerAwareUnionRDDBase(sc, _rdds) {
require(partitioner.isEmpty || partitioner.get.numPartitions == numPartitions,
"Partitioner of parent RDDs does not match the number of partitions: " +
s"expected $numPartitions, but got ${partitioner.map(_.numPartitions).getOrElse("none")}")

override def getPartitions: Array[Partition] = {
(0 until numPartitions).map { index =>
new PartitionerAwareUnionRDDPartition(_rdds, index)
}.toArray
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6014,6 +6014,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val UNION_OUTPUT_PARTITIONING =
buildConf("spark.sql.unionOutputPartitioning")
.internal()
.doc("When set to true, the output partitioning of UnionExec will be the same as the " +
"input partitioning if its children have same partitioning. Otherwise, it will be a " +
"default partitioning.")
.version("4.1.0")
.booleanConf
.createWithDefault(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For safety, added an internal config for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!


val LEGACY_PARSE_QUERY_WITHOUT_EOF = buildConf("spark.sql.legacy.parseQueryWithoutEof")
.internal()
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD, SQLPartitioningAwareUnionRDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
Expand Down Expand Up @@ -699,8 +699,80 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
}
}

protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
/**
* Returns the output partitionings of the children, with the attributes converted to
* the first child's attributes at the same position.
*/
private def prepareOutputPartitioning(): Seq[Partitioning] = {
// Create a map of attributes from the other children to the first child.
val firstAttrs = children.head.output
val attributesMap = children.tail.map(_.output).map { otherAttrs =>
otherAttrs.zip(firstAttrs).map { case (attr, firstAttr) =>
attr -> firstAttr
}.toMap
}

val partitionings = children.map(_.outputPartitioning)
val firstPartitioning = partitionings.head
val otherPartitionings = partitionings.tail

val convertedOtherPartitionings = otherPartitionings.zipWithIndex.map { case (p, idx) =>
val attributeMap = attributesMap(idx)
p match {
case e: Expression =>
e.transform {
case a: Attribute if attributeMap.contains(a) => attributeMap(a)
}.asInstanceOf[Partitioning]
case _ => p
}
}
Seq(firstPartitioning) ++ convertedOtherPartitionings
}

private def comparePartitioning(left: Partitioning, right: Partitioning): Boolean = {
(left, right) match {
case (SinglePartition, SinglePartition) => true
case (l: HashPartitioningLike, r: HashPartitioningLike) => l == r
// Note: two `RangePartitioning`s with even same ordering and number of partitions
// are not equal, because they might have different partition bounds.
case _ => false
}
}

override def outputPartitioning: Partitioning = {
if (conf.getConf(SQLConf.UNION_OUTPUT_PARTITIONING)) {
val partitionings = prepareOutputPartitioning()
if (partitionings.forall(comparePartitioning(_, partitionings.head))) {
val partitioner = partitionings.head

// Take the output attributes of this union and map the partitioner to them.
val attributeMap = children.head.output.zip(output).toMap
partitioner match {
case e: Expression =>
e.transform {
case a: Attribute if attributeMap.contains(a) => attributeMap(a)
}.asInstanceOf[Partitioning]
case _ => partitioner
}
} else {
super.outputPartitioning
}
} else {
super.outputPartitioning
}
}

protected override def doExecute(): RDD[InternalRow] = {
if (outputPartitioning.isInstanceOf[UnknownPartitioning]) {
sparkContext.union(children.map(_.execute()))
} else {
// This union has a known partitioning, i.e., its children have the same partitioning
// in semantics so this union can choose not to change the partitioning by using a
// custom partitioning aware union RDD.
val nonEmptyRdds = children.map(_.execute()).filter(!_.partitions.isEmpty)
new SQLPartitioningAwareUnionRDD(sparkContext, nonEmptyRdds, outputPartitioning.numPartitions)
}
}

override def supportsColumnar: Boolean = children.forall(_.supportsColumnar)

Expand Down
Loading