From 9abcd75474f188d7b9f4ed4caab6bb8e98c1da4d Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Mon, 7 Nov 2016 17:06:38 -0500 Subject: [PATCH 1/5] Implement custom coalesce --- .../scala/org/apache/spark/rdd/RDDSuite.scala | 29 ++++++---- .../plans/logical/basicLogicalOperators.scala | 14 +++++ .../scala/org/apache/spark/sql/Dataset.scala | 25 ++++++-- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../execution/basicPhysicalOperators.scala | 9 ++- .../org/apache/spark/sql/DatasetSuite.scala | 58 +++++++++++++++++++ 6 files changed, 119 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 386c0060f9c4..9e4107143561 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1158,8 +1158,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { * Took this class out of the test suite to prevent "Task not serializable" exceptions. */ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable { + + def getPartitions(parent: RDD[_]): Array[Partition] = { + parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions + } + + def getPartitionSize(partition: Partition): Long = { + partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength + } + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { - val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions + val partitions = getPartitions(parent) val groups = ArrayBuffer[PartitionGroup]() var currentGroup = new PartitionGroup() var currentSum = 0L @@ -1168,8 +1177,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria // sort partitions based on the size of the corresponding input splits partitions.sortWith((partition1, partition2) => { - val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength - val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength + val partition1Size = getPartitionSize(partition1) + val partition2Size = getPartitionSize(partition2) partition1Size < partition2Size }) @@ -1185,23 +1194,21 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria totalSum += splitSize } - while (index < partitions.size) { + while (index < partitions.length) { val partition = partitions(index) - val fileSplit = - partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] - val splitSize = fileSplit.getLength + val splitSize = getPartitionSize(partition) if (currentSum + splitSize < maxSize) { addPartition(partition, splitSize) index += 1 - if (index == partitions.size) { - updateGroups + if (index == partitions.length) { + updateGroups() } } else { - if (currentGroup.partitions.size == 0) { + if (currentGroup.partitions.isEmpty) { addPartition(partition, splitSize) index += 1 } else { - updateGroups + updateGroups() } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 303014e0b8d3..c1ee0743364b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.rdd.PartitionCoalescer import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ @@ -752,6 +753,19 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") } +/** + * Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by + * supplying a [[PartitionCoalescer]] to control the behavior of the partitioning. + */ +case class PartitionCoalesce( + numPartitions: Int, + partitionCoalescer: Option[PartitionCoalescer], + child: LogicalPlan) extends UnaryNode { + require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + override def output: Seq[Attribute] = child.output +} + + /** * This method repartitions data using [[Expression]]s into `numPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index aa968d8b3c34..33d0e0909fdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PartitionCoalescer, RDD} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.CatalogRelation @@ -2662,7 +2662,7 @@ class Dataset[T] private[sql]( } /** - * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions + * Returns a new Dataset that has at most `numPartitions` partitions. * are requested. If a larger number of partitions is requested, it will stay at the current * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not @@ -2675,12 +2675,27 @@ class Dataset[T] private[sql]( * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * + * A [[PartitionCoalescer]] can also be supplied allowing the behavior of the partitioning to be + * customized similar to [[RDD.coalesce]]. + * + * @group typedrel + * @since 2.2.0 + */ + def coalesce(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer]): Dataset[T] = + withTypedPlan { + PartitionCoalesce(numPartitions, partitionCoalescer, logicalPlan) + } + + /** + * Returns a new Dataset that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { - Repartition(numPartitions, shuffle = false, logicalPlan) - } + def coalesce(numPartitions: Int): Dataset[T] = coalesce(numPartitions, None) /** * Returns a new Dataset that contains only the unique rows from this Dataset. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 691f71a7d4ac..583e55571837 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -394,8 +394,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (shuffle) { ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { - execution.CoalesceExec(numPartitions, planLater(child)) :: Nil + execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil } + case logical.PartitionCoalesce(numPartitions, partitionCoalescer, child) => + execution.CoalesceExec(numPartitions, planLater(child), partitionCoalescer) :: Nil case logical.Sort(sortExprs, global, child) => execution.SortExec(sortExprs, global, planLater(child)) :: Nil case logical.Project(projectList, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 2151c339b9b8..4cf3ee877b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} -import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} +import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} @@ -571,7 +571,10 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). */ -case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { +case class CoalesceExec( + numPartitions: Int, + child: SparkPlan, + partitionCoalescer: Option[PartitionCoalescer]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { @@ -580,7 +583,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN } protected override def doExecute(): RDD[InternalRow] = { - child.execute().coalesce(numPartitions, shuffle = false) + child.execute().coalesce(numPartitions, shuffle = false, partitionCoalescer) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 40235e32d35d..63c7f48a48b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import org.apache.spark.Partition +import org.apache.spark.rdd._ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ @@ -119,6 +122,46 @@ class DatasetSuite extends QueryTest with SharedSQLContext { data: _*) } + test("coalesce, custom") { + + val maxSplitSize = 512 + // Similar to the implementation of `test("custom RDD coalescer")` from [[RDDSuite]] we first + // write out to disk, to ensure that our splits are in fact [[FileSplit]] instances. + withTempPath { path => + val data = (1 to 1000).map(i => ClassData(i.toString, i)) + data.toDS().repartition(50).write.format("csv").save(path.toString) + + val schema = StructType(Seq($"a".string, $"b".int)) + + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> "200", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { + + val ds = spark.read.format("csv") + .schema(schema) + .load(path.toString) + .as[ClassData] + + val coalescedDataSet = + ds.coalesce(4, + partitionCoalescer = Option(new DataSetSizeBasedPartitionCoalescer(maxSplitSize))) + + assert(coalescedDataSet.rdd.partitions.length <= 50) + + var totalPartitionCount = 0L + coalescedDataSet.rdd.partitions.foreach(partition => { + var splitSizeSum = 0L + partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => { + splitSizeSum += + partition.asInstanceOf[FilePartition].files.map(_.length).sum + totalPartitionCount += 1 + }) + assert(splitSizeSum <= maxSplitSize) + }) + } + } + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkDataset( @@ -1402,3 +1445,18 @@ case class CircularReferenceClassB(cls: CircularReferenceClassA) case class CircularReferenceClassC(ar: Array[CircularReferenceClassC]) case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE]) case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD]) + + +class DataSetSizeBasedPartitionCoalescer(maxSize: Int) extends + SizeBasedCoalescer(maxSize) { + + override def getPartitions(parent: RDD[_]): Array[Partition] = { + parent.firstParent.asInstanceOf[FileScanRDD].partitions + } + + override def getPartitionSize(partition: Partition): Long = { + val res = partition.asInstanceOf[FilePartition].files.map( + x => x.length - x.start).sum + res + } +} From 253bdcc32ad67cda00223d6bca773253ad94d034 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 6 Aug 2017 23:44:51 +0900 Subject: [PATCH 2/5] Add more fixes --- .../plans/logical/basicLogicalOperators.scala | 10 ++-- .../scala/org/apache/spark/sql/Dataset.scala | 42 ++++++++++----- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../execution/basicPhysicalOperators.scala | 11 ++-- .../org/apache/spark/sql/DatasetSuite.scala | 51 ++++++++----------- 5 files changed, 57 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c1ee0743364b..f8b0f0ad92ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.util.random.RandomSampler @@ -755,17 +754,14 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) /** * Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by - * supplying a [[PartitionCoalescer]] to control the behavior of the partitioning. + * supplying a `PartitionCoalescer` to control the behavior of the partitioning. */ -case class PartitionCoalesce( - numPartitions: Int, - partitionCoalescer: Option[PartitionCoalescer], - child: LogicalPlan) extends UnaryNode { +case class PartitionCoalesce(numPartitions: Int, coalescer: PartitionCoalescer, child: LogicalPlan) + extends UnaryNode { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override def output: Seq[Attribute] = child.output } - /** * This method repartitions data using [[Expression]]s into `numPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 33d0e0909fdd..92e943ed9810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2662,8 +2662,10 @@ class Dataset[T] private[sql]( } /** - * Returns a new Dataset that has at most `numPartitions` partitions. - * are requested. If a larger number of partitions is requested, it will stay at the current + * Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions. + * `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function. + * + * If a larger number of partitions is requested, it will stay at the current * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. @@ -2675,27 +2677,39 @@ class Dataset[T] private[sql]( * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * - * A [[PartitionCoalescer]] can also be supplied allowing the behavior of the partitioning to be - * customized similar to [[RDD.coalesce]]. - * * @group typedrel - * @since 2.2.0 + * @since 2.3.0 */ - def coalesce(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer]): Dataset[T] = - withTypedPlan { - PartitionCoalesce(numPartitions, partitionCoalescer, logicalPlan) + def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = { + userDefinedCoalescer.map { coalescer => + withTypedPlan { + PartitionCoalesce(numPartitions, coalescer, logicalPlan) + } + }.getOrElse { + coalesce(numPartitions) } + } /** - * Returns a new Dataset that has exactly `numPartitions` partitions. - * Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g. - * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. + * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions + * are requested. If a larger number of partitions is requested, it will stay at the current + * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in + * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not + * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you can call repartition. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). * * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): Dataset[T] = coalesce(numPartitions, None) + def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan) + } /** * Returns a new Dataset that contains only the unique rows from this Dataset. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 583e55571837..0500216c119b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -396,8 +396,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } else { execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil } - case logical.PartitionCoalesce(numPartitions, partitionCoalescer, child) => - execution.CoalesceExec(numPartitions, planLater(child), partitionCoalescer) :: Nil + case logical.PartitionCoalesce(numPartitions, coalescer, child) => + execution.CoalesceExec(numPartitions, planLater(child), Some(coalescer)) :: Nil case logical.Sort(sortExprs, global, child) => execution.SortExec(sortExprs, global, planLater(child)) :: Nil case logical.Project(projectList, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 4cf3ee877b97..a22dd3f9fc55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql.execution import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration -import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} +import org.apache.spark.{InterruptibleIterator, TaskContext} import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates import org.apache.spark.sql.types.LongType import org.apache.spark.util.ThreadUtils import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} @@ -571,10 +570,8 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). */ -case class CoalesceExec( - numPartitions: Int, - child: SparkPlan, - partitionCoalescer: Option[PartitionCoalescer]) extends UnaryExecNode { +case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer]) + extends UnaryExecNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { @@ -583,7 +580,7 @@ case class CoalesceExec( } protected override def doExecute(): RDD[InternalRow] = { - child.execute().coalesce(numPartitions, shuffle = false, partitionCoalescer) + child.execute().coalesce(numPartitions, shuffle = false, coalescer) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 63c7f48a48b7..bcb74467d36a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -123,41 +123,34 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } test("coalesce, custom") { - - val maxSplitSize = 512 - // Similar to the implementation of `test("custom RDD coalescer")` from [[RDDSuite]] we first - // write out to disk, to ensure that our splits are in fact [[FileSplit]] instances. withTempPath { path => - val data = (1 to 1000).map(i => ClassData(i.toString, i)) - data.toDS().repartition(50).write.format("csv").save(path.toString) - - val schema = StructType(Seq($"a".string, $"b".int)) + val maxSplitSize = 512 + val testData = (1 to 1000).map(i => ClassData(i.toString, i)) + testData.toDS().repartition(50).write.format("csv").save(path.toString) withSQLConf( - SQLConf.FILES_MAX_PARTITION_BYTES.key -> "200", - SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { - + SQLConf.FILES_MAX_PARTITION_BYTES.key -> (maxSplitSize / 3).toString, + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0" + ) { val ds = spark.read.format("csv") - .schema(schema) + .schema("a STRING, b INT") .load(path.toString) .as[ClassData] val coalescedDataSet = - ds.coalesce(4, - partitionCoalescer = Option(new DataSetSizeBasedPartitionCoalescer(maxSplitSize))) + ds.coalesce(4, Some(new DatasetSizeBasedPartitionCoalescer(maxSplitSize))) assert(coalescedDataSet.rdd.partitions.length <= 50) - var totalPartitionCount = 0L - coalescedDataSet.rdd.partitions.foreach(partition => { - var splitSizeSum = 0L - partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => { - splitSizeSum += - partition.asInstanceOf[FilePartition].files.map(_.length).sum - totalPartitionCount += 1 - }) - assert(splitSizeSum <= maxSplitSize) - }) + val expectedPartitionCount = ds.rdd.partitions.size + val totalPartitionCount = coalescedDataSet.rdd.partitions.map { p1 => + val splitSizes = p1.asInstanceOf[CoalescedRDDPartition].parents.map { p2 => + p2.asInstanceOf[FilePartition].files.map(_.length).sum + } + assert(splitSizes.sum <= maxSplitSize) + splitSizes.size + }.sum + assert(totalPartitionCount === expectedPartitionCount) } } } @@ -1446,17 +1439,13 @@ case class CircularReferenceClassC(ar: Array[CircularReferenceClassC]) case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE]) case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD]) - -class DataSetSizeBasedPartitionCoalescer(maxSize: Int) extends - SizeBasedCoalescer(maxSize) { +class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) { override def getPartitions(parent: RDD[_]): Array[Partition] = { - parent.firstParent.asInstanceOf[FileScanRDD].partitions + parent.firstParent.partitions } override def getPartitionSize(partition: Partition): Long = { - val res = partition.asInstanceOf[FilePartition].files.map( - x => x.length - x.start).sum - res + partition.asInstanceOf[FilePartition].files.map(x => x.length - x.start).sum } } From c0306d346e336a3bae6335e27f676c3254d915cb Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 7 Aug 2017 14:30:47 +0900 Subject: [PATCH 3/5] Fix minor fixes --- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++----- .../spark/sql/execution/basicPhysicalOperators.scala | 9 ++++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 9e4107143561..8716b9d1a138 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1194,21 +1194,21 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria totalSum += splitSize } - while (index < partitions.length) { + while (index < partitions.size) { val partition = partitions(index) val splitSize = getPartitionSize(partition) if (currentSum + splitSize < maxSize) { addPartition(partition, splitSize) index += 1 - if (index == partitions.length) { - updateGroups() + if (index == partitions.size) { + updateGroups } } else { - if (currentGroup.partitions.isEmpty) { + if (currentGroup.partitions.size == 0) { addPartition(partition, splitSize) index += 1 } else { - updateGroups() + updateGroups } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index a22dd3f9fc55..4afdb5749fd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -560,7 +560,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * Physical plan for returning a new RDD that has exactly `numPartitions` partitions. * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions + * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions * is requested, it will stay at the current number of partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, @@ -569,6 +569,13 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * you see ShuffleExchange. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). + * + * If you want to define how to coalesce partitions, you can set a custom strategy + * to coalesce partitions in `coalescer`. + * + * @param numPartitions Number of partitions this coalescer tries to reduce partitions into + * @param child the SparkPlan + * @param coalescer Optional coalescer that an user specifies */ case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer]) extends UnaryExecNode { From 3b4c679a0f16fcd85e1e11470eea250fd342e898 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 7 Aug 2017 16:22:07 +0900 Subject: [PATCH 4/5] Drop PartitionCoalesce and use Reparititon --- .../sql/catalyst/optimizer/Optimizer.scala | 15 ++++++------ .../plans/logical/basicLogicalOperators.scala | 24 ++++++++++--------- .../scala/org/apache/spark/sql/Dataset.scala | 11 +++------ .../spark/sql/execution/SparkStrategies.scala | 6 ++--- .../spark/sql/execution/PlannerSuite.scala | 2 +- 5 files changed, 27 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d82af94dbffb..9da6efb19744 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -596,14 +596,15 @@ object CollapseProject extends Rule[LogicalPlan] { object CollapseRepartition extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // Case 1: When a Repartition has a child of Repartition or RepartitionByExpression, - // 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child - // enables the shuffle. Returns the child node if the last numPartitions is bigger; - // otherwise, keep unchanged. + // 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified + // strategy), but the child enables the shuffle. Returns the child node if the last + // numPartitions is bigger; otherwise, keep unchanged. // 2) In the other cases, returns the top node with the child's child - case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match { - case (false, true) => if (r.numPartitions >= child.numPartitions) child else r - case _ => r.copy(child = child.child) - } + case r @ Repartition(_, _, child: RepartitionOperation, None) => + (r.shuffle, child.shuffle) match { + case (false, true) => if (r.numPartitions >= child.numPartitions) child else r + case _ => r.copy(child = child.child) + } // Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression // we can remove the child. case r @ RepartitionByExpression(_, child: RepartitionOperation, _) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f8b0f0ad92ff..a8b463e78c80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -746,22 +746,24 @@ abstract class RepartitionOperation extends UnaryNode { * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer * of the output requires some specific ordering or distribution of the data. + * + * If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy + * to coalesce input partitions. + * + * @param numPartitions How many partitions to use in the output RDD + * @param shuffle Whether to shuffle when repartitioning + * @param child the LogicalPlan + * @param coalescer Optional coalescer that an user specifies */ -case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) +case class Repartition( + numPartitions: Int, + shuffle: Boolean, + child: LogicalPlan, + coalescer: Option[PartitionCoalescer] = None) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") } -/** - * Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by - * supplying a `PartitionCoalescer` to control the behavior of the partitioning. - */ -case class PartitionCoalesce(numPartitions: Int, coalescer: PartitionCoalescer, child: LogicalPlan) - extends UnaryNode { - require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") - override def output: Seq[Attribute] = child.output -} - /** * This method repartitions data using [[Expression]]s into `numPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 92e943ed9810..40a70f78fa70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2680,14 +2680,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.3.0 */ - def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = { - userDefinedCoalescer.map { coalescer => - withTypedPlan { - PartitionCoalesce(numPartitions, coalescer, logicalPlan) - } - }.getOrElse { - coalesce(numPartitions) - } + def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]) + : Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0500216c119b..bb540f5156e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -390,14 +390,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, planLater(left), planLater(right)) :: Nil - case logical.Repartition(numPartitions, shuffle, child) => + case logical.Repartition(numPartitions, shuffle, child, coalescer) => if (shuffle) { ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { - execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil + execution.CoalesceExec(numPartitions, planLater(child), coalescer) :: Nil } - case logical.PartitionCoalesce(numPartitions, coalescer, child) => - execution.CoalesceExec(numPartitions, planLater(child), Some(coalescer)) :: Nil case logical.Sort(sortExprs, global, child) => execution.SortExec(sortExprs, global, planLater(child)) :: Nil case logical.Project(projectList, child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4d155d538d63..87a09ff0f341 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -244,7 +244,7 @@ class PlannerSuite extends SharedSQLContext { assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3) assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2) doubleRepartitioned.queryExecution.optimizedPlan match { - case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) => + case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, _, _), _) => assert(numPartitions === 5) assert(shuffle === false) assert(shuffleChild === true) From d7392c447c6fbfcc8ba7256dc6fa17b709a3fe24 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 8 Aug 2017 11:34:29 +0900 Subject: [PATCH 5/5] Apply comments --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 +++++--- .../catalyst/plans/logical/basicLogicalOperators.scala | 2 ++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 5 +++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9da6efb19744..90563148d66d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -600,10 +600,12 @@ object CollapseRepartition extends Rule[LogicalPlan] { // strategy), but the child enables the shuffle. Returns the child node if the last // numPartitions is bigger; otherwise, keep unchanged. // 2) In the other cases, returns the top node with the child's child - case r @ Repartition(_, _, child: RepartitionOperation, None) => + case r @ Repartition(_, _, child: RepartitionOperation, coalescer) => (r.shuffle, child.shuffle) match { - case (false, true) => if (r.numPartitions >= child.numPartitions) child else r - case _ => r.copy(child = child.child) + case (false, true) => + if (coalescer.isEmpty && r.numPartitions >= child.numPartitions) child else r + case _ => + r.copy(child = child.child) } // Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression // we can remove the child. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a8b463e78c80..ea43b24fcb2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -762,6 +762,8 @@ case class Repartition( coalescer: Option[PartitionCoalescer] = None) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + require(!shuffle || coalescer.isEmpty, + "Custom coalescer is not allowed for repartition(shuffle=true)") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 40a70f78fa70..a52b5108a5f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2680,8 +2680,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.3.0 */ - def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]) - : Dataset[T] = withTypedPlan { + def coalesce( + numPartitions: Int, + userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) }