From d41e34720073be568053f220946cb01ea2ff61ef Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 28 Sep 2017 11:42:41 -0700 Subject: [PATCH 1/3] [SPARK-22159][SQL] Make config names consistently end with "enabled". spark.sql.execution.arrow.enable and spark.sql.codegen.aggregate.map.twolevel.enable -> enabled --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d00c67248753..358cf6214907 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -668,7 +668,7 @@ object SQLConf { .createWithDefault(40) val ENABLE_TWOLEVEL_AGG_MAP = - buildConf("spark.sql.codegen.aggregate.map.twolevel.enable") + buildConf("spark.sql.codegen.aggregate.map.twolevel.enabled") .internal() .doc("Enable two-level aggregate hash map. When enabled, records will first be " + "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " + @@ -908,7 +908,7 @@ object SQLConf { .createWithDefault(false) val ARROW_EXECUTION_ENABLE = - buildConf("spark.sql.execution.arrow.enable") + buildConf("spark.sql.execution.arrow.enabled") .internal() .doc("Make use of Apache Arrow for columnar data transfers. Currently available " + "for use with pyspark.sql.DataFrame.toPandas with the following data types: " + From 8e51ae52b6d54ed46a3441bbb83a8e93ba214410 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 28 Sep 2017 14:36:34 -0700 Subject: [PATCH 2/3] [SPARK-22160][SQL] Allow changing sample points per partition in range shuffle exchange --- .../main/scala/org/apache/spark/Partitioner.scala | 13 +++++++++++-- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ .../execution/exchange/ShuffleExchangeExec.scala | 7 ++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 1484f29525a4..fa397f09f651 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -108,9 +108,17 @@ class HashPartitioner(partitions: Int) extends Partitioner { class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], - private var ascending: Boolean = true) + private var ascending: Boolean = true, + val samplePointsPerPartitionHint: Int = 20) extends Partitioner { + // A constructor declared in order to maintain backward compatibility for Java, when we add the + // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160. + // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor. + def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = { + this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20) + } + // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") @@ -122,7 +130,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. - val sampleSize = math.min(20.0 * partitions, 1e6) + // Cast to double to avoid overflowing ints or longs + val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 358cf6214907..1a73d168b9b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -907,6 +907,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION = + buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition") + .internal() + .doc("Number of points to sample per partition in order to determine the range boundaries" + + " for range partitioning, typically used in global sorting (without limit).") + .intConf + .createWithDefault(100) + val ARROW_EXECUTION_ENABLE = buildConf("spark.sql.execution.arrow.enabled") .internal() @@ -1199,6 +1207,8 @@ class SQLConf extends Serializable with Logging { def supportQuotedRegexColumnName: Boolean = getConf(SUPPORT_QUOTED_REGEX_COLUMN_NAME) + def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) + def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE) def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 11c4aa9b4acf..5a1e217082bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.MutablePair /** @@ -218,7 +219,11 @@ object ShuffleExchangeExec { iter.map(row => mutablePair.update(row.copy(), null)) } implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) - new RangePartitioner(numPartitions, rddForSampling, ascending = true) + new RangePartitioner( + numPartitions, + rddForSampling, + ascending = true, + samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new Partitioner { override def numPartitions: Int = 1 From 3d82277d3e7a1183f7f4fc9b4e444a66aea4544e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 28 Sep 2017 15:52:14 -0700 Subject: [PATCH 3/3] Revert "[SPARK-22160][SQL] Allow changing sample points per partition in range shuffle exchange" This reverts commit 8e51ae52b6d54ed46a3441bbb83a8e93ba214410. --- .../main/scala/org/apache/spark/Partitioner.scala | 13 ++----------- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ---------- .../execution/exchange/ShuffleExchangeExec.scala | 7 +------ 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index fa397f09f651..1484f29525a4 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -108,17 +108,9 @@ class HashPartitioner(partitions: Int) extends Partitioner { class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], - private var ascending: Boolean = true, - val samplePointsPerPartitionHint: Int = 20) + private var ascending: Boolean = true) extends Partitioner { - // A constructor declared in order to maintain backward compatibility for Java, when we add the - // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160. - // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor. - def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = { - this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20) - } - // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") @@ -130,8 +122,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. - // Cast to double to avoid overflowing ints or longs - val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6) + val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1a73d168b9b6..358cf6214907 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -907,14 +907,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION = - buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition") - .internal() - .doc("Number of points to sample per partition in order to determine the range boundaries" + - " for range partitioning, typically used in global sorting (without limit).") - .intConf - .createWithDefault(100) - val ARROW_EXECUTION_ENABLE = buildConf("spark.sql.execution.arrow.enabled") .internal() @@ -1207,8 +1199,6 @@ class SQLConf extends Serializable with Logging { def supportQuotedRegexColumnName: Boolean = getConf(SUPPORT_QUOTED_REGEX_COLUMN_NAME) - def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION) - def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE) def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 5a1e217082bc..11c4aa9b4acf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.MutablePair /** @@ -219,11 +218,7 @@ object ShuffleExchangeExec { iter.map(row => mutablePair.update(row.copy(), null)) } implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) - new RangePartitioner( - numPartitions, - rddForSampling, - ascending = true, - samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) + new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition => new Partitioner { override def numPartitions: Int = 1