From dd98d574c42c520f260404af7b53af9f64cad20b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 12 Oct 2015 02:07:26 +0800 Subject: [PATCH 1/3] Use mixing hash-based and sort-based aggregation in TungstenAggregationIterator. --- .../TungstenAggregationIterator.scala | 74 ++++++++++++------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 4bb95c9eb7f3e..f5e3a02449fee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -338,29 +338,32 @@ class TungstenAggregationIterator( // Part 3: Methods and fields used by hash-based aggregation. /////////////////////////////////////////////////////////////////////////// + private[this] def initHashMap(): UnsafeFixedWidthAggregationMap = { + new UnsafeFixedWidthAggregationMap( + initialAggregationBuffer, + StructType.fromAttributes(allAggregateFunctions.flatMap(_.aggBufferAttributes)), + StructType.fromAttributes(groupingExpressions.map(_.toAttribute)), + TaskContext.get.taskMemoryManager(), + SparkEnv.get.shuffleMemoryManager, + 1024 * 16, // initial capacity + SparkEnv.get.shuffleMemoryManager.pageSizeBytes, + false // disable tracking of performance metrics + ) + } + // This is the hash map used for hash-based aggregation. It is backed by an // UnsafeFixedWidthAggregationMap and it is used to store // all groups and their corresponding aggregation buffers for hash-based aggregation. - private[this] val hashMap = new UnsafeFixedWidthAggregationMap( - initialAggregationBuffer, - StructType.fromAttributes(allAggregateFunctions.flatMap(_.aggBufferAttributes)), - StructType.fromAttributes(groupingExpressions.map(_.toAttribute)), - TaskContext.get.taskMemoryManager(), - SparkEnv.get.shuffleMemoryManager, - 1024 * 16, // initial capacity - SparkEnv.get.shuffleMemoryManager.pageSizeBytes, - false // disable tracking of performance metrics - ) + private[this] val hashMap = initHashMap() // Exposed for testing private[aggregate] def getHashMap: UnsafeFixedWidthAggregationMap = hashMap - // The function used to read and process input rows. When processing input rows, - // it first uses hash-based aggregation by putting groups and their buffers in - // hashMap. If we could not allocate more memory for the map, we switch to - // sort-based aggregation (by calling switchToSortBasedAggregation). - private def processInputs(): Unit = { - assert(inputIter != null, "attempted to process input when iterator was null") + // Process input rows using a given UnsafeFixedWidthAggregationMap. + // It returns None if all input rows are processed with the given hash map, and + // returns (groupingKey, current input) if we can't allocate more memory for the hash map. + private def internalProcessInputs( + hashMap: UnsafeFixedWidthAggregationMap): Option[(UnsafeRow, InternalRow)] = { if (groupingExpressions.isEmpty) { // If there is no grouping expressions, we can just reuse the same buffer over and over again. // Note that it would be better to eliminate the hash map entirely in the future. @@ -378,14 +381,25 @@ class TungstenAggregationIterator( val groupingKey = groupProjection.apply(newInput) val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) if (buffer == null) { - // buffer == null means that we could not allocate more memory. - // Now, we need to spill the map and switch to sort-based aggregation. - switchToSortBasedAggregation(groupingKey, newInput) + return Some((groupingKey, newInput)) } else { processRow(buffer, newInput) } } } + None + } + + // The function used to read and process input rows. When processing input rows, + // it first uses hash-based aggregation by putting groups and their buffers in + // hashMap. If we could not allocate more memory for the map, we switch to + // sort-based aggregation (by calling switchToSortBasedAggregation). + private def processInputs(): Unit = { + assert(inputIter != null, "attempted to process input when iterator was null") + val ret = internalProcessInputs(hashMap) + if (ret.isDefined) { + switchToSortBasedAggregation(ret.get._1, ret.get._2) + } } // This function is only used for testing. It basically the same as processInputs except @@ -453,6 +467,8 @@ class TungstenAggregationIterator( case _ => false } + var newHashMap = initHashMap() + // Note: Since we spill the sorter's contents immediately after creating it, we must insert // something into the sorter here to ensure that we acquire at least a page of memory. // This is done through `externalSorter.insertKV`, which will trigger the page allocation. @@ -470,12 +486,20 @@ class TungstenAggregationIterator( // Process the rest of input rows. while (inputIter.hasNext) { - val newInput = inputIter.next() - numInputRows += 1 - val groupingKey = groupProjection.apply(newInput) - buffer.copyFrom(initialAggregationBuffer) - processRow(buffer, newInput) - externalSorter.insertKV(groupingKey, buffer) + val ret = internalProcessInputs(newHashMap) + if (ret.isDefined) { + // If we can't allocate more memory, we insert all records from the hashmap + // into externalSorter. + val iter = newHashMap.iterator() + while(iter.next()) { + externalSorter.insertKV(iter.getKey(), iter.getValue()) + } + newHashMap = initHashMap() + + buffer.copyFrom(initialAggregationBuffer) + processRow(buffer, ret.get._2) + externalSorter.insertKV(ret.get._1, buffer) + } } } else { // When needsProcess is false, the format of input rows is groupingKey + aggregation buffer. From 787846bc5106a6a2372e109598fd78c0a2b38427 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 12 Oct 2015 13:12:58 +0800 Subject: [PATCH 2/3] Fix it for failed test. --- .../aggregate/TungstenAggregationIterator.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index f5e3a02449fee..e2f1df66051ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -467,8 +467,6 @@ class TungstenAggregationIterator( case _ => false } - var newHashMap = initHashMap() - // Note: Since we spill the sorter's contents immediately after creating it, we must insert // something into the sorter here to ensure that we acquire at least a page of memory. // This is done through `externalSorter.insertKV`, which will trigger the page allocation. @@ -486,6 +484,7 @@ class TungstenAggregationIterator( // Process the rest of input rows. while (inputIter.hasNext) { + var newHashMap = initHashMap() val ret = internalProcessInputs(newHashMap) if (ret.isDefined) { // If we can't allocate more memory, we insert all records from the hashmap @@ -494,11 +493,17 @@ class TungstenAggregationIterator( while(iter.next()) { externalSorter.insertKV(iter.getKey(), iter.getValue()) } - newHashMap = initHashMap() + newHashMap.free() buffer.copyFrom(initialAggregationBuffer) processRow(buffer, ret.get._2) externalSorter.insertKV(ret.get._1, buffer) + } else { + val iter = newHashMap.iterator() + while(iter.next()) { + externalSorter.insertKV(iter.getKey(), iter.getValue()) + } + newHashMap.free() } } } else { From aa51d8a49a32542f6add39b3f983dbb2320a9346 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Oct 2015 23:41:29 +0800 Subject: [PATCH 3/3] Add a configration for enabling and disabling pre-aggregation. --- .../scala/org/apache/spark/sql/SQLConf.scala | 7 +++ .../aggregate/TungstenAggregate.scala | 4 ++ .../TungstenAggregationIterator.scala | 48 +++++++++++-------- .../TungstenAggregationIteratorSuite.scala | 2 +- 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6f2892085a8f8..db0cbd8a0ab6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -238,6 +238,13 @@ private[spark] object SQLConf { doc = "When true, use the optimized Tungsten physical execution backend which explicitly " + "manages memory and dynamically generates bytecode for expression evaluation.") + val PRE_AGGREGATION_ENABLED = booleanConf("spark.sql.preAggregation.enabled", + defaultValue = Some(true), + doc = "When true, use pre-aggregation in TungstenAggregationIterator to continue to " + + "perform hash-based pre-aggregation after we've decided to spill and switch to " + + "sort-based aggregation. If a very low reduction factor is expected for the data " + + "this feature shoule be disabled to obtain better performance.") + val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", defaultValue = Some(true), // use TUNGSTEN_ENABLED as default doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 0d3a4b36c161b..be2edc50b7e8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, UnaryNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.types.StructType case class TungstenAggregate( @@ -78,6 +79,8 @@ case class TungstenAggregate( } } + private val preAggregation: Boolean = sqlContext.getConf(SQLConf.PRE_AGGREGATION_ENABLED) + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") @@ -100,6 +103,7 @@ case class TungstenAggregate( newMutableProjection, child.output, testFallbackStartsAt, + preAggregation, numInputRows, numOutputRows, dataSize, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index ab00c49b61112..bbfc1acbe7497 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -86,6 +86,7 @@ class TungstenAggregationIterator( newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection), originalInputAttributes: Seq[Attribute], testFallbackStartsAt: Option[Int], + preAggregation: Boolean, numInputRows: LongSQLMetric, numOutputRows: LongSQLMetric, dataSize: LongSQLMetric, @@ -488,7 +489,7 @@ class TungstenAggregationIterator( // This is the hash map used for hash-based aggregation. It is backed by an // UnsafeFixedWidthAggregationMap and it is used to store // all groups and their corresponding aggregation buffers for hash-based aggregation. - private[this] val hashMap = initHashMap() + private[this] var hashMap = initHashMap() // Exposed for testing private[aggregate] def getHashMap: UnsafeFixedWidthAggregationMap = hashMap @@ -618,26 +619,35 @@ class TungstenAggregationIterator( // Process the rest of input rows. while (inputIter.hasNext) { - var newHashMap = initHashMap() - val ret = internalProcessInputs(newHashMap) - if (ret.isDefined) { - // If we can't allocate more memory, we insert all records from the hashmap - // into externalSorter. - val iter = newHashMap.iterator() - while(iter.next()) { - externalSorter.insertKV(iter.getKey(), iter.getValue()) + if (preAggregation) { + hashMap = initHashMap() + val ret = internalProcessInputs(hashMap) + if (ret.isDefined) { + // If we can't allocate more memory, we insert all records from the hashmap + // into externalSorter. + val iter = hashMap.iterator() + while(iter.next()) { + externalSorter.insertKV(iter.getKey(), iter.getValue()) + } + hashMap.free() + + buffer.copyFrom(initialAggregationBuffer) + processRow(buffer, ret.get._2) + externalSorter.insertKV(ret.get._1, buffer) + } else { + val iter = hashMap.iterator() + while(iter.next()) { + externalSorter.insertKV(iter.getKey(), iter.getValue()) + } + hashMap.free() } - newHashMap.free() - - buffer.copyFrom(initialAggregationBuffer) - processRow(buffer, ret.get._2) - externalSorter.insertKV(ret.get._1, buffer) } else { - val iter = newHashMap.iterator() - while(iter.next()) { - externalSorter.insertKV(iter.getKey(), iter.getValue()) - } - newHashMap.free() + val newInput = inputIter.next() + numInputRows += 1 + val groupingKey = groupProjection.apply(newInput) + buffer.copyFrom(initialAggregationBuffer) + processRow(buffer, newInput) + externalSorter.insertKV(groupingKey, buffer) } } } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala index 475037bd45379..5409a9d56ad83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala @@ -39,7 +39,7 @@ class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLConte } val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy") iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty, - 0, Seq.empty, newMutableProjection, Seq.empty, None, + 0, Seq.empty, newMutableProjection, Seq.empty, None, false, dummyAccum, dummyAccum, dummyAccum, dummyAccum) val numPages = iter.getHashMap.getNumDataPages assert(numPages === 1)