Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)

}

case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)

/**
* Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
* fewer number of partitions.
*/
case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])
extends Expression with Partitioning with Unevaluable {

override def children: Seq[Expression] = from.expressions
override def nullable: Boolean = from.nullable
override def dataType: DataType = from.dataType

override def satisfies0(required: Distribution): Boolean = from.satisfies0(required)

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
copy(from = from.copy(expressions = newChildren))

override val numPartitions: Int = partitions.length

override def toString: String = from.toString
override def sql: String = from.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, why do we need to hide CoalescedHashPartitioning? Can we run some example queries and check EXPLAIN and SQL web UI?

}

/**
Expand Down Expand Up @@ -708,6 +737,26 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}

case class CoalescedHashShuffleSpec(
from: ShuffleSpec,
partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec =>
numPartitions == 1
case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
partitions == otherPartitions && from.isCompatibleWith(otherParent)
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
false
}

override def canCreatePartitioning: Boolean = false

override def numPartitions: Int = partitions.length
}

/**
* [[ShuffleSpec]] created by [[KeyGroupedPartitioning]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType

Expand Down Expand Up @@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite {
false)
}

test("HashPartitioning is the output partitioning") {
// HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of
// the required clustering expressions.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"b", $"c")),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"d", $"e")),
false)

// When ClusteredDistribution.requireAllClusterKeys is set to true,
// HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are
// exactly same as the required clustering expressions.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
true)

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

checkSatisfied(
HashPartitioning(Seq($"b", $"a", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

// HashPartitioning cannot satisfy OrderedDistribution
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
private def testHashPartitioningLike(
partitioningName: String,
create: (Seq[Expression], Int) => Partitioning): Unit = {

test(s"$partitioningName is the output partitioning") {
// HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of
// the required clustering expressions.
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
create(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"b", $"c")),
false)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"d", $"e")),
false)

// When ClusteredDistribution.requireAllClusterKeys is set to true,
// HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are
// exactly same as the required clustering expressions.
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
true)

checkSatisfied(
create(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

checkSatisfied(
create(Seq($"b", $"a", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

// HashPartitioning cannot satisfy OrderedDistribution
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 1),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false) // TODO: this can be relaxed.

checkSatisfied(
create(Seq($"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
}
}

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 1),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false) // TODO: this can be relaxed.
testHashPartitioningLike("HashPartitioning",
(expressions, numPartitions) => HashPartitioning(expressions, numPartitions))

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
}
testHashPartitioningLike("CoalescedHashPartitioning", (expressions, numPartitions) =>
CoalescedHashPartitioning(
HashPartitioning(expressions, numPartitions), Seq(CoalescedBoundary(0, numPartitions))))

test("RangePartitioning is the output partitioning") {
// RangePartitioning can satisfy OrderedDistribution iff its ordering is a prefix
Expand Down
Loading