From bc7f179355ddb41ba56d2d24602c246c71d51436 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 28 Jul 2023 09:46:38 +0800 Subject: [PATCH 01/14] [SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning --- .../scala/org/apache/spark/Partitioner.scala | 12 + .../plans/physical/partitioning.scala | 76 +++++ .../apache/spark/sql/internal/SQLConf.scala | 13 + .../exchange/EnsureRequirements.scala | 269 ++++++++++-------- .../exchange/ShuffleExchangeExec.scala | 7 + .../KeyGroupedPartitioningSuite.scala | 71 +++++ 6 files changed, 337 insertions(+), 111 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 5dffba2ee8e08..a5a87bbc73200 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext override def getPartition(key: Any): Int = key.asInstanceOf[Int] } +/** + * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map + */ +private[spark] class PartitionValueMapPartitioner( + valueMap: mutable.Map[Seq[Any], Int], + override val numPartitions: Int) extends Partitioner { + override def getPartition(key: Any): Int = { + val keys = key.asInstanceOf[Seq[Any]] + valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, numPartitions)) + } +} + /** * A [[org.apache.spark.Partitioner]] that partitions all records into a single partition. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index bd8ba54ddd736..3fbbbd41fb068 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -512,6 +512,35 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning { } } +/** + * Represents a partitioning use partition value map to partition. + */ +case class PartitionValueMapPartitioning( + expressions: Seq[Expression], + valueMap: Map[Seq[Any], Int], + numPartitions: Int) extends Partitioning { + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case h: StatefulOpClusteredDistribution => + expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { + case (l, r) => l.semanticEquals(r) + } + case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => + if (requireAllClusterKeys) { + c.areAllClusterKeysMatched(expressions) + } else { + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + } + case _ => false + } + } + } + + override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = + PartitionValueMapShuffleSpec(this, distribution) +} + /** * This is used in the scenario where an operator has multiple children (e.g., join) and one or more * of which have their own requirement regarding whether its data can be considered as @@ -731,6 +760,53 @@ case class KeyGroupedShuffleSpec( override def canCreatePartitioning: Boolean = false } +case class PartitionValueMapShuffleSpec( + partitioning: PartitionValueMapPartitioning, + distribution: ClusteredDistribution) extends ShuffleSpec { + + /** + * A sequence where each element is a set of positions of the partition expression to the cluster + * keys. For instance, if cluster keys are [a, b, b] and partition expressions are + * [bucket(4, a), years(b)], the result will be [(0), (1, 2)]. + * + * Therefore the mapping here is very similar to that from `HashShuffleSpec`. + */ + lazy val keyPositions: Seq[mutable.BitSet] = { + val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet] + distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) => + distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos) + } + partitioning.expressions.map(k => distKeyToPos.getOrElse(k.canonicalized, mutable.BitSet.empty)) + } + + override def numPartitions: Int = partitioning.numPartitions + + override def isCompatibleWith(other: ShuffleSpec): Boolean = other match { + // Here we check: + // 1. both distributions have the same number of clustering keys + // 2. both partitioning have the same number of partitions + // 3. both partitioning have the same number of expressions + // 4. both partitioning have the same partition value map + // 5. each pair of partitioning expression from both sides has overlapping positions in their + // corresponding distributions. + case otherSpec @ PartitionValueMapShuffleSpec(otherPartitioning, otherDistribution) => + distribution.clustering.length == otherDistribution.clustering.length && + numPartitions == other.numPartitions && + partitioning.valueMap.equals(otherSpec.partitioning.valueMap) && + partitioning.expressions.length == otherPartitioning.expressions.length && { + val otherKeyPositions = otherSpec.keyPositions + keyPositions.zip(otherKeyPositions).forall { case (left, right) => + left.intersect(right).nonEmpty + } + } + case ShuffleSpecCollection(specs) => + specs.exists(isCompatibleWith) + case _ => false + } + + override def canCreatePartitioning: Boolean = false +} + case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec { override def isCompatibleWith(other: ShuffleSpec): Boolean = { specs.exists(_.isCompatibleWith(other)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fa08bd7ed3bd6..6341d1f82817a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1496,6 +1496,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.shuffleOneSide.enabled") + .doc("During a storage-partitioned join, whether to allow to shuffle only one side." + + "When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " + + "only shuffle the other side. This optimization will reduce the amount of data that " + + s"needs to be shuffle. This config requires ${V2_BUCKETING_ENABLED.key} to be enabled") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") .doc("The maximum number of buckets allowed.") .version("2.4.0") @@ -4853,6 +4863,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingPartiallyClusteredDistributionEnabled: Boolean = getConf(SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED) + def v2BucketingShuffleOneSideEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 42c880e7c6262..b7657d844cb47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -368,150 +368,197 @@ case class EnsureRequirements( var newLeft = left var newRight = right - val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) => + val specsOpts = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) => if (!d.isInstanceOf[ClusteredDistribution]) return None val cd = d.asInstanceOf[ClusteredDistribution] val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd) - if (specOpt.isEmpty) return None - specOpt.get + specOpt + } + val specsAllExist = specsOpts.forall(_.nonEmpty) + if ((!conf.v2BucketingShuffleOneSideEnabled && !specsAllExist) + || specsOpts.count(_.isEmpty) > 1) { + return None } - - val leftSpec = specs.head - val rightSpec = specs(1) var isCompatible = false - if (!conf.v2BucketingPushPartValuesEnabled) { - isCompatible = leftSpec.isCompatibleWith(rightSpec) - } else { - logInfo("Pushing common partition values for storage-partitioned join") - isCompatible = leftSpec.areKeysCompatible(rightSpec) - - // Partition expressions are compatible. Regardless of whether partition values - // match from both sides of children, we can calculate a superset of partition values and - // push-down to respective data sources so they can adjust their output partitioning by - // filling missing partition keys with empty partitions. As result, we can still avoid - // shuffle. - // - // For instance, if two sides of a join have partition expressions - // `day(a)` and `day(b)` respectively - // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but - // with different partition values: - // `day(a)`: [0, 1] - // `day(b)`: [1, 2, 3] - // Following the case 2 above, we don't have to shuffle both sides, but instead can - // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data - // sources. - if (isCompatible) { - val leftPartValues = leftSpec.partitioning.partitionValues - val rightPartValues = rightSpec.partitioning.partitionValues - - logInfo( - s""" - |Left side # of partitions: ${leftPartValues.size} - |Right side # of partitions: ${rightPartValues.size} - |""".stripMargin) - - // As partition keys are compatible, we can pick either left or right as partition - // expressions - val partitionExprs = leftSpec.partitioning.expressions - - var mergedPartValues = InternalRowComparableWrapper + + if (specsAllExist) { + val leftSpec = specsOpts.head.get + val rightSpec = specsOpts(1).get + + if (!conf.v2BucketingPushPartValuesEnabled) { + isCompatible = leftSpec.isCompatibleWith(rightSpec) + } else { + logInfo("Pushing common partition values for storage-partitioned join") + isCompatible = leftSpec.areKeysCompatible(rightSpec) + + // Partition expressions are compatible. Regardless of whether partition values + // match from both sides of children, we can calculate a superset of partition values and + // push-down to respective data sources so they can adjust their output partitioning by + // filling missing partition keys with empty partitions. As result, we can still avoid + // shuffle. + // + // For instance, if two sides of a join have partition expressions + // `day(a)` and `day(b)` respectively + // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but + // with different partition values: + // `day(a)`: [0, 1] + // `day(b)`: [1, 2, 3] + // Following the case 2 above, we don't have to shuffle both sides, but instead can + // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data + // sources. + if (isCompatible) { + val leftPartValues = leftSpec.partitioning.partitionValues + val rightPartValues = rightSpec.partitioning.partitionValues + + logInfo( + s""" + |Left side # of partitions: ${leftPartValues.size} + |Right side # of partitions: ${rightPartValues.size} + |""".stripMargin) + + // As partition keys are compatible, we can pick either left or right as partition + // expressions + val partitionExprs = leftSpec.partitioning.expressions + + var mergedPartValues = InternalRowComparableWrapper .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) .map(v => (v, 1)) - logInfo(s"After merging, there are ${mergedPartValues.size} partitions") + logInfo(s"After merging, there are ${mergedPartValues.size} partitions") - var replicateLeftSide = false - var replicateRightSide = false - var applyPartialClustering = false + var replicateLeftSide = false + var replicateRightSide = false + var applyPartialClustering = false - // This means we allow partitions that are not clustered on their values, - // that is, multiple partitions with the same partition value. In the - // following, we calculate how many partitions that each distinct partition - // value has, and pushdown the information to scans, so they can adjust their - // final input partitions respectively. - if (conf.v2BucketingPartiallyClusteredDistributionEnabled) { - logInfo("Calculating partially clustered distribution for " + + // This means we allow partitions that are not clustered on their values, + // that is, multiple partitions with the same partition value. In the + // following, we calculate how many partitions that each distinct partition + // value has, and pushdown the information to scans, so they can adjust their + // final input partitions respectively. + if (conf.v2BucketingPartiallyClusteredDistributionEnabled) { + logInfo("Calculating partially clustered distribution for " + "storage-partitioned join") - // Similar to `OptimizeSkewedJoin`, we need to check join type and decide - // whether partially clustered distribution can be applied. For instance, the - // optimization cannot be applied to a left outer join, where the left hand - // side is chosen as the side to replicate partitions according to stats. - // Otherwise, query result could be incorrect. - val canReplicateLeft = canReplicateLeftSide(joinType) - val canReplicateRight = canReplicateRightSide(joinType) + // Similar to `OptimizeSkewedJoin`, we need to check join type and decide + // whether partially clustered distribution can be applied. For instance, the + // optimization cannot be applied to a left outer join, where the left hand + // side is chosen as the side to replicate partitions according to stats. + // Otherwise, query result could be incorrect. + val canReplicateLeft = canReplicateLeftSide(joinType) + val canReplicateRight = canReplicateRightSide(joinType) - if (!canReplicateLeft && !canReplicateRight) { - logInfo("Skipping partially clustered distribution as it cannot be applied for " + + if (!canReplicateLeft && !canReplicateRight) { + logInfo("Skipping partially clustered distribution as it cannot be applied for " + s"join type '$joinType'") - } else { - val leftLink = left.logicalLink - val rightLink = right.logicalLink + } else { + val leftLink = left.logicalLink + val rightLink = right.logicalLink - replicateLeftSide = if ( - leftLink.isDefined && rightLink.isDefined && + replicateLeftSide = if ( + leftLink.isDefined && rightLink.isDefined && leftLink.get.stats.sizeInBytes > 1 && rightLink.get.stats.sizeInBytes > 1) { - logInfo( - s""" - |Using plan statistics to determine which side of join to fully - |cluster partition values: - |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes} - |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes} - |""".stripMargin) - leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes - } else { - // As a simple heuristic, we pick the side with fewer number of partitions - // to apply the grouping & replication of partitions - logInfo("Using number of partitions to determine which side of join " + + logInfo( + s""" + |Using plan statistics to determine which side of join to fully + |cluster partition values: + |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes} + |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes} + |""".stripMargin) + leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes + } else { + // As a simple heuristic, we pick the side with fewer number of partitions + // to apply the grouping & replication of partitions + logInfo("Using number of partitions to determine which side of join " + "to fully cluster partition values") - leftPartValues.size < rightPartValues.size - } + leftPartValues.size < rightPartValues.size + } - replicateRightSide = !replicateLeftSide + replicateRightSide = !replicateLeftSide - // Similar to skewed join, we need to check the join type to see whether replication - // of partitions can be applied. For instance, replication should not be allowed for - // the left-hand side of a right outer join. - if (replicateLeftSide && !canReplicateLeft) { - logInfo("Left-hand side is picked but cannot be applied to join type " + + // Similar to skewed join, we need to check the join type to see whether replication + // of partitions can be applied. For instance, replication should not be allowed for + // the left-hand side of a right outer join. + if (replicateLeftSide && !canReplicateLeft) { + logInfo("Left-hand side is picked but cannot be applied to join type " + s"'$joinType'. Skipping partially clustered distribution.") - replicateLeftSide = false - } else if (replicateRightSide && !canReplicateRight) { - logInfo("Right-hand side is picked but cannot be applied to join type " + + replicateLeftSide = false + } else if (replicateRightSide && !canReplicateRight) { + logInfo("Right-hand side is picked but cannot be applied to join type " + s"'$joinType'. Skipping partially clustered distribution.") - replicateRightSide = false - } else { - val partValues = if (replicateLeftSide) rightPartValues else leftPartValues - val numExpectedPartitions = partValues - .map(InternalRowComparableWrapper(_, partitionExprs)) - .groupBy(identity) - .mapValues(_.size) - - mergedPartValues = mergedPartValues.map { case (partVal, numParts) => - (partVal, numExpectedPartitions.getOrElse( - InternalRowComparableWrapper(partVal, partitionExprs), numParts)) - } - - logInfo("After applying partially clustered distribution, there are " + + replicateRightSide = false + } else { + val partValues = if (replicateLeftSide) rightPartValues else leftPartValues + val numExpectedPartitions = partValues + .map(InternalRowComparableWrapper(_, partitionExprs)) + .groupBy(identity) + .mapValues(_.size) + + mergedPartValues = mergedPartValues.map { case (partVal, numParts) => + (partVal, numExpectedPartitions.getOrElse( + InternalRowComparableWrapper(partVal, partitionExprs), numParts)) + } + + logInfo("After applying partially clustered distribution, there are " + s"${mergedPartValues.map(_._2).sum} partitions.") - applyPartialClustering = true + applyPartialClustering = true + } } } - } - // Now we need to push-down the common partition key to the scan in each child - newLeft = populatePartitionValues( - left, mergedPartValues, applyPartialClustering, replicateLeftSide) - newRight = populatePartitionValues( - right, mergedPartValues, applyPartialClustering, replicateRightSide) + // Now we need to push-down the common partition key to the scan in each child + newLeft = populatePartitionValues( + left, mergedPartValues, applyPartialClustering, replicateLeftSide) + newRight = populatePartitionValues( + right, mergedPartValues, applyPartialClustering, replicateRightSide) + } + } + } else if (conf.v2BucketingShuffleOneSideEnabled) { + val rightShuffled = specsOpts.head.nonEmpty + // Only should replace the shuffle exchange + if (rightShuffled && right.isInstanceOf[ShuffleExchangeExec]) { + val newRightOpt = reAdaptShuffle(specsOpts.head.get, + right.asInstanceOf[ShuffleExchangeExec]) + if (newRightOpt.isDefined) { + isCompatible = true + newRight = newRightOpt.get + } + } else if (!rightShuffled && left.isInstanceOf[ShuffleExchangeExec]) { + val newLeftOpt = reAdaptShuffle(specsOpts(1).get, left.asInstanceOf[ShuffleExchangeExec]) + if (newLeftOpt.isDefined) { + isCompatible = true + newLeft = newLeftOpt.get + } } } if (isCompatible) Some(Seq(newLeft, newRight)) else None } + private def reAdaptShuffle( + keyGroupedShuffleSpec: KeyGroupedShuffleSpec, + plan: ShuffleExchangeExec): Option[SparkPlan] = { + + // Only support partition expressions are AttributeReference for now + if (!keyGroupedShuffleSpec.partitioning.expressions + .forall(_.isInstanceOf[AttributeReference])) { + return None + } + val partitionValueMap = mutable.Map[Seq[Any], Int]() + keyGroupedShuffleSpec.partitioning.partitionValues.zipWithIndex.foreach(partAndIndex => { + partitionValueMap(partAndIndex._1.toSeq(keyGroupedShuffleSpec.partitioning.expressions + .map(_.dataType))) = partAndIndex._2 + }) + val expressions = plan.outputPartitioning match { + case HashPartitioning(expressions, _) => expressions + case _ => return None + } + + Some(plan.copy(outputPartitioning = PartitionValueMapPartitioning(expressions = expressions, + valueMap = partitionValueMap.toMap, numPartitions = keyGroupedShuffleSpec.numPartitions))) + } + // Similar to `OptimizeSkewedJoin.canSplitRightSide` private def canReplicateLeftSide(joinType: JoinType): Boolean = { joinType == Inner || joinType == Cross || joinType == RightOuter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 91f2099ce2d53..ea61c5e0af9e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.exchange import java.util.function.Supplier +import scala.collection.mutable import scala.concurrent.Future import org.apache.spark._ @@ -29,6 +30,7 @@ import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProces import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ @@ -299,6 +301,9 @@ object ShuffleExchangeExec { ascending = true, samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new ConstantPartitioner + case PartitionValueMapPartitioning(_, valueMap, n) => + new PartitionValueMapPartitioner(valueMap = mutable.Map(valueMap.toSeq: _*), + numPartitions = n) case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } @@ -325,6 +330,8 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) row => projection(row) case SinglePartition => identity + case PartitionValueMapPartitioning(expressions, _, _) => + row => bindReferences(expressions, outputAttributes).map(_.eval(row)) case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index be5e1b524e565..cfcec39d5202b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1037,4 +1037,75 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } } + + test("SPARK-41471: shuffle one side: only one side reports partitioning") { + val items_partitions = Array(identity("id")) + createTable(items, items_schema, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchases_schema, Array.empty) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp))") + + Seq(true, false).foreach { shuffleOneSide => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (shuffleOneSide) { + assert(shuffles.size == 1, "only shuffle one side not report partitioning") + } else { + assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + + " is not enabled") + } + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + } + } + } + + + test("SPARK-41471: shuffle one side: partitioning with transform") { + val items_partitions = Array(years("arrive_time")) + createTable(items, items_schema, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2021-02-01' as timestamp))") + + createTable(purchases, purchases_schema, Array.empty) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2021-02-01' as timestamp))") + + Seq(true, false).foreach { shuffleOneSide => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + + "ON i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (shuffleOneSide) { + assert(shuffles.size == 2, "partitioning with transform not work now") + } else { + assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + + " is not enabled") + } + + checkAnswer(df, Seq( + Row(1, "aa", 40.0, 42.0), + Row(3, "bb", 10.0, 42.0), + Row(4, "cc", 15.5, 19.5))) + } + } + } + } From 260d054bb5508435c1489f32bc009d18b8832e60 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 28 Jul 2023 14:40:07 +0800 Subject: [PATCH 02/14] add new test case --- .../KeyGroupedPartitioningSuite.scala | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index cfcec39d5202b..1471f9dca9a67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1071,6 +1071,38 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("SPARK-41471: shuffle one side: only one side reports partitioning with two identity") { + val items_partitions = Array(identity("id"), identity("arrive_time")) + createTable(items, items_schema, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchases_schema, Array.empty) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp))") + + Seq(true, false).foreach { shuffleOneSide => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + + "ON i.id = p.item_id and i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (shuffleOneSide) { + assert(shuffles.size == 1, "only shuffle one side not report partitioning") + } else { + assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + + " is not enabled") + } + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0))) + } + } + } test("SPARK-41471: shuffle one side: partitioning with transform") { val items_partitions = Array(years("arrive_time")) @@ -1107,5 +1139,4 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } } - } From b2b3a10340e8d20d8224c1d745486c74ee001a7a Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 8 Aug 2023 13:51:50 +0800 Subject: [PATCH 03/14] update --- .../scala/org/apache/spark/Partitioner.scala | 2 +- .../plans/physical/partitioning.scala | 80 +---- .../exchange/EnsureRequirements.scala | 297 ++++++++---------- .../exchange/ShuffleExchangeExec.scala | 13 +- .../KeyGroupedPartitioningSuite.scala | 1 - 5 files changed, 139 insertions(+), 254 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index a5a87bbc73200..a509dd7d80050 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -140,7 +140,7 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext /** * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map */ -private[spark] class PartitionValueMapPartitioner( +private[spark] class KeyGroupedPartitioner( valueMap: mutable.Map[Seq[Any], Int], override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 47a81f553f24e..62c4440cefc75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -519,35 +519,6 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning { } } -/** - * Represents a partitioning use partition value map to partition. - */ -case class PartitionValueMapPartitioning( - expressions: Seq[Expression], - valueMap: Map[Seq[Any], Int], - numPartitions: Int) extends Partitioning { - override def satisfies0(required: Distribution): Boolean = { - super.satisfies0(required) || { - required match { - case h: StatefulOpClusteredDistribution => - expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { - case (l, r) => l.semanticEquals(r) - } - case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => - if (requireAllClusterKeys) { - c.areAllClusterKeysMatched(expressions) - } else { - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) - } - case _ => false - } - } - } - - override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = - PartitionValueMapShuffleSpec(this, distribution) -} - /** * This is used in the scenario where an operator has multiple children (e.g., join) and one or more * of which have their own requirement regarding whether its data can be considered as @@ -764,54 +735,13 @@ case class KeyGroupedShuffleSpec( case _ => false } - override def canCreatePartitioning: Boolean = false -} + override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleOneSideEnabled && + // Only support partition expressions are AttributeReference for now + partitioning.expressions.forall(_.isInstanceOf[AttributeReference]) -case class PartitionValueMapShuffleSpec( - partitioning: PartitionValueMapPartitioning, - distribution: ClusteredDistribution) extends ShuffleSpec { - - /** - * A sequence where each element is a set of positions of the partition expression to the cluster - * keys. For instance, if cluster keys are [a, b, b] and partition expressions are - * [bucket(4, a), years(b)], the result will be [(0), (1, 2)]. - * - * Therefore the mapping here is very similar to that from `HashShuffleSpec`. - */ - lazy val keyPositions: Seq[mutable.BitSet] = { - val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet] - distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) => - distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos) - } - partitioning.expressions.map(k => distKeyToPos.getOrElse(k.canonicalized, mutable.BitSet.empty)) - } - - override def numPartitions: Int = partitioning.numPartitions - - override def isCompatibleWith(other: ShuffleSpec): Boolean = other match { - // Here we check: - // 1. both distributions have the same number of clustering keys - // 2. both partitioning have the same number of partitions - // 3. both partitioning have the same number of expressions - // 4. both partitioning have the same partition value map - // 5. each pair of partitioning expression from both sides has overlapping positions in their - // corresponding distributions. - case otherSpec @ PartitionValueMapShuffleSpec(otherPartitioning, otherDistribution) => - distribution.clustering.length == otherDistribution.clustering.length && - numPartitions == other.numPartitions && - partitioning.valueMap.equals(otherSpec.partitioning.valueMap) && - partitioning.expressions.length == otherPartitioning.expressions.length && { - val otherKeyPositions = otherSpec.keyPositions - keyPositions.zip(otherKeyPositions).forall { case (left, right) => - left.intersect(right).nonEmpty - } - } - case ShuffleSpecCollection(specs) => - specs.exists(isCompatibleWith) - case _ => false + override def createPartitioning(clustering: Seq[Expression]): Partitioning = { + KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues) } - - override def canCreatePartitioning: Boolean = false } case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b7657d844cb47..9c66c5723a372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -368,197 +368,150 @@ case class EnsureRequirements( var newLeft = left var newRight = right - val specsOpts = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) => + val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) => if (!d.isInstanceOf[ClusteredDistribution]) return None val cd = d.asInstanceOf[ClusteredDistribution] val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd) - specOpt - } - val specsAllExist = specsOpts.forall(_.nonEmpty) - if ((!conf.v2BucketingShuffleOneSideEnabled && !specsAllExist) - || specsOpts.count(_.isEmpty) > 1) { - return None + if (specOpt.isEmpty) return None + specOpt.get } - var isCompatible = false + val leftSpec = specs.head + val rightSpec = specs(1) - if (specsAllExist) { - val leftSpec = specsOpts.head.get - val rightSpec = specsOpts(1).get + var isCompatible = false + if (!conf.v2BucketingPushPartValuesEnabled) { + isCompatible = leftSpec.isCompatibleWith(rightSpec) + } else { + logInfo("Pushing common partition values for storage-partitioned join") + isCompatible = leftSpec.areKeysCompatible(rightSpec) + + // Partition expressions are compatible. Regardless of whether partition values + // match from both sides of children, we can calculate a superset of partition values and + // push-down to respective data sources so they can adjust their output partitioning by + // filling missing partition keys with empty partitions. As result, we can still avoid + // shuffle. + // + // For instance, if two sides of a join have partition expressions + // `day(a)` and `day(b)` respectively + // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but + // with different partition values: + // `day(a)`: [0, 1] + // `day(b)`: [1, 2, 3] + // Following the case 2 above, we don't have to shuffle both sides, but instead can + // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data + // sources. + if (isCompatible) { + val leftPartValues = leftSpec.partitioning.partitionValues + val rightPartValues = rightSpec.partitioning.partitionValues + + logInfo( + s""" + |Left side # of partitions: ${leftPartValues.size} + |Right side # of partitions: ${rightPartValues.size} + |""".stripMargin) + + // As partition keys are compatible, we can pick either left or right as partition + // expressions + val partitionExprs = leftSpec.partitioning.expressions + + var mergedPartValues = InternalRowComparableWrapper + .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) + .map(v => (v, 1)) + + logInfo(s"After merging, there are ${mergedPartValues.size} partitions") + + var replicateLeftSide = false + var replicateRightSide = false + var applyPartialClustering = false + + // This means we allow partitions that are not clustered on their values, + // that is, multiple partitions with the same partition value. In the + // following, we calculate how many partitions that each distinct partition + // value has, and pushdown the information to scans, so they can adjust their + // final input partitions respectively. + if (conf.v2BucketingPartiallyClusteredDistributionEnabled) { + logInfo("Calculating partially clustered distribution for " + + "storage-partitioned join") + + // Similar to `OptimizeSkewedJoin`, we need to check join type and decide + // whether partially clustered distribution can be applied. For instance, the + // optimization cannot be applied to a left outer join, where the left hand + // side is chosen as the side to replicate partitions according to stats. + // Otherwise, query result could be incorrect. + val canReplicateLeft = canReplicateLeftSide(joinType) + val canReplicateRight = canReplicateRightSide(joinType) + + if (!canReplicateLeft && !canReplicateRight) { + logInfo("Skipping partially clustered distribution as it cannot be applied for " + + s"join type '$joinType'") + } else { + val leftLink = left.logicalLink + val rightLink = right.logicalLink + + replicateLeftSide = if ( + leftLink.isDefined && rightLink.isDefined && + leftLink.get.stats.sizeInBytes > 1 && + rightLink.get.stats.sizeInBytes > 1) { + logInfo( + s""" + |Using plan statistics to determine which side of join to fully + |cluster partition values: + |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes} + |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes} + |""".stripMargin) + leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes + } else { + // As a simple heuristic, we pick the side with fewer number of partitions + // to apply the grouping & replication of partitions + logInfo("Using number of partitions to determine which side of join " + + "to fully cluster partition values") + leftPartValues.size < rightPartValues.size + } - if (!conf.v2BucketingPushPartValuesEnabled) { - isCompatible = leftSpec.isCompatibleWith(rightSpec) - } else { - logInfo("Pushing common partition values for storage-partitioned join") - isCompatible = leftSpec.areKeysCompatible(rightSpec) - - // Partition expressions are compatible. Regardless of whether partition values - // match from both sides of children, we can calculate a superset of partition values and - // push-down to respective data sources so they can adjust their output partitioning by - // filling missing partition keys with empty partitions. As result, we can still avoid - // shuffle. - // - // For instance, if two sides of a join have partition expressions - // `day(a)` and `day(b)` respectively - // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a = t2.b`), but - // with different partition values: - // `day(a)`: [0, 1] - // `day(b)`: [1, 2, 3] - // Following the case 2 above, we don't have to shuffle both sides, but instead can - // just push the common set of partition values: `[0, 1, 2, 3]` down to the two data - // sources. - if (isCompatible) { - val leftPartValues = leftSpec.partitioning.partitionValues - val rightPartValues = rightSpec.partitioning.partitionValues - - logInfo( - s""" - |Left side # of partitions: ${leftPartValues.size} - |Right side # of partitions: ${rightPartValues.size} - |""".stripMargin) - - // As partition keys are compatible, we can pick either left or right as partition - // expressions - val partitionExprs = leftSpec.partitioning.expressions - - var mergedPartValues = InternalRowComparableWrapper - .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) - .map(v => (v, 1)) - - logInfo(s"After merging, there are ${mergedPartValues.size} partitions") - - var replicateLeftSide = false - var replicateRightSide = false - var applyPartialClustering = false - - // This means we allow partitions that are not clustered on their values, - // that is, multiple partitions with the same partition value. In the - // following, we calculate how many partitions that each distinct partition - // value has, and pushdown the information to scans, so they can adjust their - // final input partitions respectively. - if (conf.v2BucketingPartiallyClusteredDistributionEnabled) { - logInfo("Calculating partially clustered distribution for " + - "storage-partitioned join") - - // Similar to `OptimizeSkewedJoin`, we need to check join type and decide - // whether partially clustered distribution can be applied. For instance, the - // optimization cannot be applied to a left outer join, where the left hand - // side is chosen as the side to replicate partitions according to stats. - // Otherwise, query result could be incorrect. - val canReplicateLeft = canReplicateLeftSide(joinType) - val canReplicateRight = canReplicateRightSide(joinType) - - if (!canReplicateLeft && !canReplicateRight) { - logInfo("Skipping partially clustered distribution as it cannot be applied for " + - s"join type '$joinType'") + replicateRightSide = !replicateLeftSide + + // Similar to skewed join, we need to check the join type to see whether replication + // of partitions can be applied. For instance, replication should not be allowed for + // the left-hand side of a right outer join. + if (replicateLeftSide && !canReplicateLeft) { + logInfo("Left-hand side is picked but cannot be applied to join type " + + s"'$joinType'. Skipping partially clustered distribution.") + replicateLeftSide = false + } else if (replicateRightSide && !canReplicateRight) { + logInfo("Right-hand side is picked but cannot be applied to join type " + + s"'$joinType'. Skipping partially clustered distribution.") + replicateRightSide = false } else { - val leftLink = left.logicalLink - val rightLink = right.logicalLink - - replicateLeftSide = if ( - leftLink.isDefined && rightLink.isDefined && - leftLink.get.stats.sizeInBytes > 1 && - rightLink.get.stats.sizeInBytes > 1) { - logInfo( - s""" - |Using plan statistics to determine which side of join to fully - |cluster partition values: - |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes} - |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes} - |""".stripMargin) - leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes - } else { - // As a simple heuristic, we pick the side with fewer number of partitions - // to apply the grouping & replication of partitions - logInfo("Using number of partitions to determine which side of join " + - "to fully cluster partition values") - leftPartValues.size < rightPartValues.size + val partValues = if (replicateLeftSide) rightPartValues else leftPartValues + val numExpectedPartitions = partValues + .map(InternalRowComparableWrapper(_, partitionExprs)) + .groupBy(identity) + .mapValues(_.size) + + mergedPartValues = mergedPartValues.map { case (partVal, numParts) => + (partVal, numExpectedPartitions.getOrElse( + InternalRowComparableWrapper(partVal, partitionExprs), numParts)) } - replicateRightSide = !replicateLeftSide - - // Similar to skewed join, we need to check the join type to see whether replication - // of partitions can be applied. For instance, replication should not be allowed for - // the left-hand side of a right outer join. - if (replicateLeftSide && !canReplicateLeft) { - logInfo("Left-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") - replicateLeftSide = false - } else if (replicateRightSide && !canReplicateRight) { - logInfo("Right-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") - replicateRightSide = false - } else { - val partValues = if (replicateLeftSide) rightPartValues else leftPartValues - val numExpectedPartitions = partValues - .map(InternalRowComparableWrapper(_, partitionExprs)) - .groupBy(identity) - .mapValues(_.size) - - mergedPartValues = mergedPartValues.map { case (partVal, numParts) => - (partVal, numExpectedPartitions.getOrElse( - InternalRowComparableWrapper(partVal, partitionExprs), numParts)) - } - - logInfo("After applying partially clustered distribution, there are " + - s"${mergedPartValues.map(_._2).sum} partitions.") - applyPartialClustering = true - } + logInfo("After applying partially clustered distribution, there are " + + s"${mergedPartValues.map(_._2).sum} partitions.") + applyPartialClustering = true } } - - // Now we need to push-down the common partition key to the scan in each child - newLeft = populatePartitionValues( - left, mergedPartValues, applyPartialClustering, replicateLeftSide) - newRight = populatePartitionValues( - right, mergedPartValues, applyPartialClustering, replicateRightSide) - } - } - } else if (conf.v2BucketingShuffleOneSideEnabled) { - val rightShuffled = specsOpts.head.nonEmpty - // Only should replace the shuffle exchange - if (rightShuffled && right.isInstanceOf[ShuffleExchangeExec]) { - val newRightOpt = reAdaptShuffle(specsOpts.head.get, - right.asInstanceOf[ShuffleExchangeExec]) - if (newRightOpt.isDefined) { - isCompatible = true - newRight = newRightOpt.get - } - } else if (!rightShuffled && left.isInstanceOf[ShuffleExchangeExec]) { - val newLeftOpt = reAdaptShuffle(specsOpts(1).get, left.asInstanceOf[ShuffleExchangeExec]) - if (newLeftOpt.isDefined) { - isCompatible = true - newLeft = newLeftOpt.get } + + // Now we need to push-down the common partition key to the scan in each child + newLeft = populatePartitionValues( + left, mergedPartValues, applyPartialClustering, replicateLeftSide) + newRight = populatePartitionValues( + right, mergedPartValues, applyPartialClustering, replicateRightSide) } } if (isCompatible) Some(Seq(newLeft, newRight)) else None } - private def reAdaptShuffle( - keyGroupedShuffleSpec: KeyGroupedShuffleSpec, - plan: ShuffleExchangeExec): Option[SparkPlan] = { - - // Only support partition expressions are AttributeReference for now - if (!keyGroupedShuffleSpec.partitioning.expressions - .forall(_.isInstanceOf[AttributeReference])) { - return None - } - val partitionValueMap = mutable.Map[Seq[Any], Int]() - keyGroupedShuffleSpec.partitioning.partitionValues.zipWithIndex.foreach(partAndIndex => { - partitionValueMap(partAndIndex._1.toSeq(keyGroupedShuffleSpec.partitioning.expressions - .map(_.dataType))) = partAndIndex._2 - }) - val expressions = plan.outputPartitioning match { - case HashPartitioning(expressions, _) => expressions - case _ => return None - } - - Some(plan.copy(outputPartitioning = PartitionValueMapPartitioning(expressions = expressions, - valueMap = partitionValueMap.toMap, numPartitions = keyGroupedShuffleSpec.numPartitions))) - } - // Similar to `OptimizeSkewedJoin.canSplitRightSide` private def canReplicateLeftSide(joinType: JoinType): Boolean = { joinType == Inner || joinType == Cross || joinType == RightOuter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index ea61c5e0af9e0..caa4ff963f61d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.exchange import java.util.function.Supplier -import scala.collection.mutable +import scala.collection.{mutable, Seq} import scala.concurrent.Future import org.apache.spark._ @@ -301,9 +301,12 @@ object ShuffleExchangeExec { ascending = true, samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new ConstantPartitioner - case PartitionValueMapPartitioning(_, valueMap, n) => - new PartitionValueMapPartitioner(valueMap = mutable.Map(valueMap.toSeq: _*), - numPartitions = n) + case KeyGroupedPartitioning(expressions, n, partitionValues) => + val partitionValueMap = mutable.Map[Seq[Any], Int]() + partitionValues.zipWithIndex.foreach(partAndIndex => { + partitionValueMap(partAndIndex._1.toSeq(expressions.map(_.dataType))) = partAndIndex._2 + }) + new KeyGroupedPartitioner(partitionValueMap, n) case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } @@ -330,7 +333,7 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) row => projection(row) case SinglePartition => identity - case PartitionValueMapPartitioning(expressions, _, _) => + case KeyGroupedPartitioning(expressions, _, _) => row => bindReferences(expressions, outputAttributes).map(_.eval(row)) case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 840008e202e11..9f78a94f9e02c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1138,7 +1138,6 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 42.0), Row(4, "cc", 15.5, 19.5))) - } } } From cb82be48f926456b6f10fd2cb1cb991e8551148f Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 14:58:55 +0800 Subject: [PATCH 04/14] revert format --- .../exchange/EnsureRequirements.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 9c66c5723a372..42c880e7c6262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -416,8 +416,8 @@ case class EnsureRequirements( val partitionExprs = leftSpec.partitioning.expressions var mergedPartValues = InternalRowComparableWrapper - .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) - .map(v => (v, 1)) + .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) + .map(v => (v, 1)) logInfo(s"After merging, there are ${mergedPartValues.size} partitions") @@ -432,7 +432,7 @@ case class EnsureRequirements( // final input partitions respectively. if (conf.v2BucketingPartiallyClusteredDistributionEnabled) { logInfo("Calculating partially clustered distribution for " + - "storage-partitioned join") + "storage-partitioned join") // Similar to `OptimizeSkewedJoin`, we need to check join type and decide // whether partially clustered distribution can be applied. For instance, the @@ -444,15 +444,15 @@ case class EnsureRequirements( if (!canReplicateLeft && !canReplicateRight) { logInfo("Skipping partially clustered distribution as it cannot be applied for " + - s"join type '$joinType'") + s"join type '$joinType'") } else { val leftLink = left.logicalLink val rightLink = right.logicalLink replicateLeftSide = if ( leftLink.isDefined && rightLink.isDefined && - leftLink.get.stats.sizeInBytes > 1 && - rightLink.get.stats.sizeInBytes > 1) { + leftLink.get.stats.sizeInBytes > 1 && + rightLink.get.stats.sizeInBytes > 1) { logInfo( s""" |Using plan statistics to determine which side of join to fully @@ -465,7 +465,7 @@ case class EnsureRequirements( // As a simple heuristic, we pick the side with fewer number of partitions // to apply the grouping & replication of partitions logInfo("Using number of partitions to determine which side of join " + - "to fully cluster partition values") + "to fully cluster partition values") leftPartValues.size < rightPartValues.size } @@ -476,11 +476,11 @@ case class EnsureRequirements( // the left-hand side of a right outer join. if (replicateLeftSide && !canReplicateLeft) { logInfo("Left-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") + s"'$joinType'. Skipping partially clustered distribution.") replicateLeftSide = false } else if (replicateRightSide && !canReplicateRight) { logInfo("Right-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") + s"'$joinType'. Skipping partially clustered distribution.") replicateRightSide = false } else { val partValues = if (replicateLeftSide) rightPartValues else leftPartValues @@ -495,7 +495,7 @@ case class EnsureRequirements( } logInfo("After applying partially clustered distribution, there are " + - s"${mergedPartValues.map(_._2).sum} partitions.") + s"${mergedPartValues.map(_._2).sum} partitions.") applyPartialClustering = true } } From 7c6cc230c241ad19ef0d7564d11b15b33e863cbf Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 16:33:44 +0800 Subject: [PATCH 05/14] format --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index caa4ff963f61d..72167e8db256c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.exchange import java.util.function.Supplier -import scala.collection.{mutable, Seq} +import scala.collection.mutable import scala.concurrent.Future import org.apache.spark._ From 50632c7b515a7556ef78eee087a70c1d539f20fc Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 13:25:25 +0800 Subject: [PATCH 06/14] update --- .../scala/org/apache/spark/Partitioner.scala | 5 +- .../KeyGroupedPartitioningSuite.scala | 49 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index a509dd7d80050..b7a9fcfa2e186 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -138,7 +138,10 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext } /** - * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map + * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map. + * The valueMap is a map that contains tuples of (partition value, partition id). It generated + * by [[org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning]], used to partition + * another side to make sure records with same partition value are in same partition. */ private[spark] class KeyGroupedPartitioner( valueMap: mutable.Map[Seq[Any], Int], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 9f78a94f9e02c..0e6a1bfbf7cf0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1073,6 +1073,55 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("SPARK-41471: shuffle one side: shuffle side has more partition value") { + val items_partitions = Array(identity("id")) + createTable(items, items_schema, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchases_schema, Array.empty) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp)), " + + "(5, 26.0, cast('2023-01-01' as timestamp)), " + + "(6, 50.0, cast('2023-02-01' as timestamp))") + + Seq(true, false).foreach { shuffleOneSide => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + Seq("JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN", "FULL OUTER JOIN").foreach { joinType => + val df = sql(s"SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i $joinType testcat.ns.$purchases p " + + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (shuffleOneSide) { + assert(shuffles.size == 1, "only shuffle one side not report partitioning") + } else { + assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one " + + "side is not enabled") + } + joinType match { + case "JOIN" => + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + case "LEFT OUTER JOIN" => + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5), + Row(4, "cc", 15.5, null))) + case "RIGHT OUTER JOIN" => + checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0), + Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + case "FULL OUTER JOIN" => + checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0), + Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5), + Row(4, "cc", 15.5, null))) + } + } + } + } + } + test("SPARK-41471: shuffle one side: only one side reports partitioning with two identity") { val items_partitions = Array(identity("id"), identity("arrive_time")) createTable(items, items_schema, items_partitions) From 5a4a6dcb5e38ad662886ad6a947abbb7f4ade86d Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 22:27:29 +0800 Subject: [PATCH 07/14] update --- .../exchange/EnsureRequirementsSuite.scala | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 09da1e1e7b013..be7ab8ef466f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -18,15 +18,17 @@ package org.apache.spark.sql.execution.exchange import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum +import org.apache.spark.sql.catalyst.optimizer.BuildRight import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, _} import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.execution.{DummySparkPlan, SortExec} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.python.FlatMapCoGroupsInPandasExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf @@ -1109,6 +1111,32 @@ class EnsureRequirementsSuite extends SharedSparkSession { } } + test(s"SPARK-41471: shuffle right side when" + + s" spark.sql.sources.v2.bucketing.shuffleOneSide.enabled is true") { + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> "true") { + + val a1 = AttributeReference("a1", IntegerType)() + + val partitionValue = Seq(50, 51, 52).map(v => InternalRow.fromSeq(Seq(v))) + val plan1 = DummySparkPlan(outputPartitioning = KeyGroupedPartitioning( + identity(a1) :: Nil, 4, partitionValue)) + val plan2 = DummySparkPlan(outputPartitioning = SinglePartition) + + val smjExec = ShuffledHashJoinExec( + a1 :: Nil, a1 :: Nil, Inner, BuildRight, None, plan1, plan2) + EnsureRequirements.apply(smjExec) match { + case ShuffledHashJoinExec(_, _, _, _, _, + DummySparkPlan(_, _, left: KeyGroupedPartitioning, _, _), + ShuffleExchangeExec(KeyGroupedPartitioning(attrs, 4, pv), + DummySparkPlan(_, _, SinglePartition, _, _), _, _), _) => + assert(left.expressions == a1 :: Nil) + assert(attrs == a1 :: Nil) + assert(partitionValue == pv) + case other => fail(other.toString) + } + } + } + test("SPARK-42168: FlatMapCoGroupInPandas and Window function with differing key order") { val lKey = AttributeReference("key", IntegerType)() val lKey2 = AttributeReference("key2", IntegerType)() From 31eca2d94640dc0829e3e0a639b695319fc9320f Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 12 Aug 2023 09:41:58 +0800 Subject: [PATCH 08/14] change config name --- .../plans/physical/partitioning.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 8 +++---- .../KeyGroupedPartitioningSuite.scala | 24 +++++++++---------- .../exchange/EnsureRequirementsSuite.scala | 4 ++-- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 62c4440cefc75..ce557422a087a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -735,7 +735,7 @@ case class KeyGroupedShuffleSpec( case _ => false } - override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleOneSideEnabled && + override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled && // Only support partition expressions are AttributeReference for now partitioning.expressions.forall(_.isInstanceOf[AttributeReference]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5dd02bef094ef..d1b68c213178e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1500,8 +1500,8 @@ object SQLConf { .booleanConf .createWithDefault(false) - val V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED = - buildConf("spark.sql.sources.v2.bucketing.shuffleOneSide.enabled") + val V2_BUCKETING_SHUFFLE_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled") .doc("During a storage-partitioned join, whether to allow to shuffle only one side." + "When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " + "only shuffle the other side. This optimization will reduce the amount of data that " + @@ -4887,8 +4887,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingPartiallyClusteredDistributionEnabled: Boolean = getConf(SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED) - def v2BucketingShuffleOneSideEnabled: Boolean = - getConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED) + def v2BucketingShuffleEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 0e6a1bfbf7cf0..7034e1d9e6007 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1054,14 +1054,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp))") - Seq(true, false).foreach { shuffleOneSide => - withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + Seq(true, false).foreach { shuffle => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (shuffleOneSide) { + if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") } else { assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + @@ -1089,15 +1089,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(5, 26.0, cast('2023-01-01' as timestamp)), " + "(6, 50.0, cast('2023-02-01' as timestamp))") - Seq(true, false).foreach { shuffleOneSide => - withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + Seq(true, false).foreach { shuffle => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { Seq("JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN", "FULL OUTER JOIN").foreach { joinType => val df = sql(s"SELECT id, name, i.price as purchase_price, p.price as sale_price " + s"FROM testcat.ns.$items i $joinType testcat.ns.$purchases p " + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (shuffleOneSide) { + if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") } else { assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one " + @@ -1136,14 +1136,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp))") - Seq(true, false).foreach { shuffleOneSide => - withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + Seq(true, false).foreach { shuffle => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + "ON i.id = p.item_id and i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (shuffleOneSide) { + if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") } else { assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + @@ -1169,14 +1169,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2021-02-01' as timestamp))") - Seq(true, false).foreach { shuffleOneSide => - withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> shuffleOneSide.toString) { + Seq(true, false).foreach { shuffle => + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + "ON i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (shuffleOneSide) { + if (shuffle) { assert(shuffles.size == 2, "partitioning with transform not work now") } else { assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index be7ab8ef466f7..0b595c8c1755a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -1112,8 +1112,8 @@ class EnsureRequirementsSuite extends SharedSparkSession { } test(s"SPARK-41471: shuffle right side when" + - s" spark.sql.sources.v2.bucketing.shuffleOneSide.enabled is true") { - withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ONE_SIDE_ENABLED.key -> "true") { + s" spark.sql.sources.v2.bucketing.shuffle.enabled is true") { + withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") { val a1 = AttributeReference("a1", IntegerType)() From 82488fbe49fc69bb3d4806148dd6618ddb0cc187 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 15 Aug 2023 14:10:25 +0800 Subject: [PATCH 09/14] update --- .../sql/execution/exchange/ShuffleExchangeExec.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 72167e8db256c..4b5e63a4e582c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -302,11 +302,10 @@ object ShuffleExchangeExec { samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new ConstantPartitioner case KeyGroupedPartitioning(expressions, n, partitionValues) => - val partitionValueMap = mutable.Map[Seq[Any], Int]() - partitionValues.zipWithIndex.foreach(partAndIndex => { - partitionValueMap(partAndIndex._1.toSeq(expressions.map(_.dataType))) = partAndIndex._2 - }) - new KeyGroupedPartitioner(partitionValueMap, n) + val valueMap = partitionValues.zipWithIndex.map { + case (partition, index) => (partition.toSeq(expressions.map(_.dataType)), index) + }.toMap + new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n) case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } From a1db61d202225ea90e626851c29918f024234885 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 18 Aug 2023 10:03:51 +0800 Subject: [PATCH 10/14] fix bug --- .../datasources/v2/BatchScanExec.scala | 11 ++++++- .../exchange/ShuffleExchangeExec.scala | 4 +-- .../KeyGroupedPartitioningSuite.scala | 30 +++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index eba3c71f871e3..d1c69358966ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -182,7 +182,16 @@ case class BatchScanExec( // Now fill missing partition keys with empty partitions val partitionMapping = nestGroupedPartitions.toMap - finalPartitions = spjParams.commonPartitionValues.get.flatMap { + + // SPARK-41471: We keep to order of partition keys in `commonPartitionValues` to + // make sure the order of partitions is deterministic in different case. + val partitionDataTypes = p.expressions.map(_.dataType) + val partitionOrdering: Ordering[(InternalRow, Int)] = { + RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) + } + val sortedCommonPartitionValues = spjParams.commonPartitionValues.get + .sorted(partitionOrdering) + finalPartitions = sortedCommonPartitionValues.flatMap { case (partValue, numSplits) => // Use empty partition for those partition values that are not present. partitionMapping.getOrElse( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 4b5e63a4e582c..750b96dc83dc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -301,8 +301,8 @@ object ShuffleExchangeExec { ascending = true, samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new ConstantPartitioner - case KeyGroupedPartitioning(expressions, n, partitionValues) => - val valueMap = partitionValues.zipWithIndex.map { + case k @ KeyGroupedPartitioning(expressions, n, _) => + val valueMap = k.uniquePartitionValues.zipWithIndex.map { case (partition, index) => (partition.toSeq(expressions.map(_.dataType)), index) }.toMap new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 7034e1d9e6007..5b5e402117384 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1191,6 +1191,36 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("SPARK-41471: shuffle one side: work with group partition split") { + val items_partitions = Array(identity("id")) + createTable(items, items_schema, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + createTable(purchases, purchases_schema, Array.empty) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(3, 19.5, cast('2020-02-01' as timestamp)), " + + "(5, 26.0, cast('2023-01-01' as timestamp)), " + + "(6, 50.0, cast('2023-02-01' as timestamp))") + + Seq(true, false).foreach { shuffle => + withSQLConf( + SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString, + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "true") { + val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) + } + } + } + test("SPARK-44641: duplicated records when SPJ is not triggered") { val items_partitions = Array(bucket(8, "id")) createTable(items, items_schema, items_partitions) From 21255cf699d5ee44c1d528dfab47f9fff0470737 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 23 Aug 2023 09:47:29 +0800 Subject: [PATCH 11/14] fix review --- .../catalyst/util/InternalRowComparableWrapper.scala | 5 ++++- .../sql/execution/datasources/v2/BatchScanExec.scala | 11 +---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala index b0e530907310a..cb8176171dc3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala @@ -79,6 +79,9 @@ object InternalRowComparableWrapper { rightPartitioning.partitionValues .map(new InternalRowComparableWrapper(_, partitionDataTypes)) .foreach(partition => partitionsSet.add(partition)) - partitionsSet.map(_.row).toSeq + val partitionOrdering: Ordering[InternalRow] = { + RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) + } + partitionsSet.map(_.row).toSeq.sorted(partitionOrdering) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index d1c69358966ac..eba3c71f871e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -182,16 +182,7 @@ case class BatchScanExec( // Now fill missing partition keys with empty partitions val partitionMapping = nestGroupedPartitions.toMap - - // SPARK-41471: We keep to order of partition keys in `commonPartitionValues` to - // make sure the order of partitions is deterministic in different case. - val partitionDataTypes = p.expressions.map(_.dataType) - val partitionOrdering: Ordering[(InternalRow, Int)] = { - RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) - } - val sortedCommonPartitionValues = spjParams.commonPartitionValues.get - .sorted(partitionOrdering) - finalPartitions = sortedCommonPartitionValues.flatMap { + finalPartitions = spjParams.commonPartitionValues.get.flatMap { case (partValue, numSplits) => // Use empty partition for those partition values that are not present. partitionMapping.getOrElse( From 1a5e2b7928cecb17fa480468b48f58166af59d62 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 23 Aug 2023 09:48:42 +0800 Subject: [PATCH 12/14] update comment --- .../spark/sql/catalyst/util/InternalRowComparableWrapper.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala index cb8176171dc3e..9a0bdc6bcfd11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala @@ -79,6 +79,8 @@ object InternalRowComparableWrapper { rightPartitioning.partitionValues .map(new InternalRowComparableWrapper(_, partitionDataTypes)) .foreach(partition => partitionsSet.add(partition)) + // SPARK-41471: We keep to order of partitions to make sure the order of + // partitions is deterministic in different case. val partitionOrdering: Ordering[InternalRow] = { RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) } From 3c98fd7f371e0331b367db1f55e71f1cd589e6ce Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 23 Aug 2023 09:54:26 +0800 Subject: [PATCH 13/14] update comment --- .../spark/sql/execution/datasources/v2/BatchScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index eba3c71f871e3..cc674961f8eb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -153,7 +153,7 @@ case class BatchScanExec( if (spjParams.commonPartitionValues.isDefined && spjParams.applyPartialClustering) { // A mapping from the common partition values to how many splits the partition - // should contain. Note this no longer maintain the partition key ordering. + // should contain. val commonPartValuesMap = spjParams.commonPartitionValues .get .map(t => (InternalRowComparableWrapper(t._1, p.expressions), t._2)) From 5d227f2e42356ece665fa7b405c17d60876db86d Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 24 Aug 2023 07:49:19 +0800 Subject: [PATCH 14/14] fix review --- core/src/main/scala/org/apache/spark/Partitioner.scala | 7 ++++--- .../sql/execution/exchange/EnsureRequirementsSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index b7a9fcfa2e186..ae39e2e183e4a 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -138,10 +138,11 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext } /** - * A [[org.apache.spark.Partitioner]] that partitions all records use partition value map. - * The valueMap is a map that contains tuples of (partition value, partition id). It generated + * A [[org.apache.spark.Partitioner]] that partitions all records using partition value map. + * The `valueMap` is a map that contains tuples of (partition value, partition id). It is generated * by [[org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning]], used to partition - * another side to make sure records with same partition value are in same partition. + * the other side of a join to make sure records with same partition value are in the same + * partition. */ private[spark] class KeyGroupedPartitioner( valueMap: mutable.Map[Seq[Any], Int], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 0b595c8c1755a..3c9b92e5f66b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -1111,8 +1111,8 @@ class EnsureRequirementsSuite extends SharedSparkSession { } } - test(s"SPARK-41471: shuffle right side when" + - s" spark.sql.sources.v2.bucketing.shuffle.enabled is true") { + test("SPARK-41471: shuffle right side when" + + " spark.sql.sources.v2.bucketing.shuffle.enabled is true") { withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") { val a1 = AttributeReference("a1", IntegerType)()