Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* :: DeveloperApi ::
Expand All @@ -45,6 +46,10 @@ case class Aggregate(
child: SparkPlan)
extends UnaryNode {

override private[sql] lazy val metrics = Map(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def requiredChildDistribution: List[Distribution] = {
if (partial) {
UnspecifiedDistribution :: Nil
Expand Down Expand Up @@ -121,12 +126,15 @@ case class Aggregate(
}

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
numInputRows += 1
var i = 0
while (i < buffer.length) {
buffer(i).update(currentRow)
Expand All @@ -142,6 +150,7 @@ case class Aggregate(
i += 1
}

numOutputRows += 1
Iterator(resultProjection(aggregateResults))
}
} else {
Expand All @@ -152,6 +161,7 @@ case class Aggregate(
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
numInputRows += 1
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
Expand Down Expand Up @@ -180,6 +190,7 @@ case class Aggregate(
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
numOutputRows += 1

var i = 0
while (i < currentBuffer.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {

override protected[sql] val trackNumOfRowsEnabled = true

protected override def doExecute(): RDD[InternalRow] = rdd

override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {

override protected[sql] val trackNumOfRowsEnabled = true

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

protected override def doExecute(): RDD[InternalRow] = rdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* Whether track the number of rows output by this SparkPlan
*/
protected[sql] def trackNumOfRowsEnabled: Boolean = false

private lazy val defaultMetrics: Map[String, SQLMetric[_, _]] =
if (trackNumOfRowsEnabled) {
Map("numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
}
else {
Map.empty
}

/**
* Return all metrics containing metrics of this SparkPlan.
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this because there are only two places using it and it's not worth to add an abstract method to the parent class.


/**
* Return a LongSQLMetric according to the name.
Expand Down Expand Up @@ -150,15 +137,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
if (trackNumOfRowsEnabled) {
val numRows = longMetric("numRows")
doExecute().map { row =>
numRows += 1
row
}
} else {
doExecute()
}
doExecute()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, SparkPlan, UnaryNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType

case class SortBasedAggregate(
Expand All @@ -38,6 +39,10 @@ case class SortBasedAggregate(
child: SparkPlan)
extends UnaryNode {

override private[sql] lazy val metrics = Map(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def outputsUnsafeRows: Boolean = false

override def canProcessUnsafeRows: Boolean = false
Expand All @@ -63,6 +68,8 @@ case class SortBasedAggregate(
}

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
// Because the constructor of an aggregation iterator will read at least the first row,
// we need to get the value of iter.hasNext first.
Expand All @@ -84,10 +91,13 @@ case class SortBasedAggregate(
newProjection _,
child.output,
iter,
outputsUnsafeRows)
outputsUnsafeRows,
numInputRows,
numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
numOutputRows += 1
Iterator[InternalRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
} else {
outputIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, AggregateFunction2}
import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.unsafe.KVIterator

/**
Expand All @@ -37,7 +38,9 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
outputsUnsafeRows: Boolean)
outputsUnsafeRows: Boolean,
numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric)
extends AggregationIterator(
groupingKeyAttributes,
valueAttributes,
Expand Down Expand Up @@ -103,6 +106,7 @@ class SortBasedAggregationIterator(
// Get the grouping key.
val groupingKey = inputKVIterator.getKey
val currentRow = inputKVIterator.getValue
numInputRows += 1

// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
Expand Down Expand Up @@ -137,7 +141,7 @@ class SortBasedAggregationIterator(
val outputRow = generateOutput(currentGroupingKey, sortBasedAggregationBuffer)
// Initialize buffer values for the next group.
initializeBuffer(sortBasedAggregationBuffer)

numOutputRows += 1
outputRow
} else {
// no more result
Expand All @@ -151,7 +155,7 @@ class SortBasedAggregationIterator(

nextGroupingKey = inputKVIterator.getKey().copy()
firstRowInNextGroup = inputKVIterator.getValue().copy()

numInputRows += 1
sortedInputHasNewGroup = true
} else {
// This inputIter is empty.
Expand Down Expand Up @@ -181,7 +185,9 @@ object SortBasedAggregationIterator {
newProjection: (Seq[Expression], Seq[Attribute]) => Projection,
inputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
outputsUnsafeRows: Boolean): SortBasedAggregationIterator = {
outputsUnsafeRows: Boolean,
numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric): SortBasedAggregationIterator = {
val kvIterator = if (UnsafeProjection.canSupport(groupingExprs)) {
AggregationIterator.unsafeKVIterator(
groupingExprs,
Expand All @@ -202,7 +208,9 @@ object SortBasedAggregationIterator {
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
outputsUnsafeRows)
outputsUnsafeRows,
numInputRows,
numOutputRows)
}
// scalastyle:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples, Distribution}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics

case class TungstenAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]],
Expand All @@ -35,6 +36,10 @@ case class TungstenAggregate(
child: SparkPlan)
extends UnaryNode {

override private[sql] lazy val metrics = Map(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def outputsUnsafeRows: Boolean = true

override def canProcessUnsafeRows: Boolean = true
Expand All @@ -61,6 +66,8 @@ case class TungstenAggregate(
}

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitions { iter =>
val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
Expand All @@ -78,9 +85,12 @@ case class TungstenAggregate(
newMutableProjection,
child.output,
iter,
testFallbackStartsAt)
testFallbackStartsAt,
numInputRows,
numOutputRows)

if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
aggregationIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, UnsafeFixedWidthAggregationMap}
import org.apache.spark.sql.execution.metric.LongSQLMetric
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -83,7 +84,9 @@ class TungstenAggregationIterator(
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
testFallbackStartsAt: Option[Int])
testFallbackStartsAt: Option[Int],
numInputRows: LongSQLMetric,
numOutputRows: LongSQLMetric)
extends Iterator[UnsafeRow] with Logging {

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -352,6 +355,7 @@ class TungstenAggregationIterator(
private def processInputs(): Unit = {
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
Expand All @@ -371,6 +375,7 @@ class TungstenAggregationIterator(
var i = 0
while (!sortBased && inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
val buffer: UnsafeRow = if (i < fallbackStartsAt) {
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
Expand Down Expand Up @@ -439,6 +444,7 @@ class TungstenAggregationIterator(
// Process the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
buffer.copyFrom(initialAggregationBuffer)
processRow(buffer, newInput)
Expand All @@ -462,6 +468,7 @@ class TungstenAggregationIterator(
// Insert the rest of input rows.
while (inputIter.hasNext) {
val newInput = inputIter.next()
numInputRows += 1
val groupingKey = groupProjection.apply(newInput)
bufferExtractor(newInput)
externalSorter.insertKV(groupingKey, buffer)
Expand Down Expand Up @@ -657,7 +664,7 @@ class TungstenAggregationIterator(
TaskContext.get().internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
}

numOutputRows += 1
res
} else {
// no more result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ import org.apache.spark.{HashPartitioner, SparkEnv}
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))

@transient lazy val buildProjection = newMutableProjection(projectList, child.output)

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
val reusableProjection = buildProjection()
iter.map(reusableProjection)
protected override def doExecute(): RDD[InternalRow] = {
val numRows = longMetric("numRows")
child.execute().mapPartitions { iter =>
val reusableProjection = buildProjection()
iter.map { row =>
numRows += 1
reusableProjection(row)
}
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Expand All @@ -57,19 +66,28 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
*/
case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {

override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
this.transformAllExpressions {
case CreateStruct(children) => CreateStructUnsafe(children)
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
protected override def doExecute(): RDD[InternalRow] = {
val numRows = longMetric("numRows")
child.execute().mapPartitions { iter =>
this.transformAllExpressions {
case CreateStruct(children) => CreateStructUnsafe(children)
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
}
val project = UnsafeProjection.create(projectList, child.output)
iter.map { row =>
numRows += 1
project(row)
}
}
val project = UnsafeProjection.create(projectList, child.output)
iter.map(project)
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Expand Down
Loading