Skip to content

Commit 0dcf9db

Browse files
Davies Liudavies
authored andcommitted
[SPARK-14669] [SQL] Fix some SQL metrics in codegen and added more
## What changes were proposed in this pull request? 1. Fix the "spill size" of TungstenAggregate and Sort 2. Rename "data size" to "peak memory" to match the actual meaning (also consistent with task metrics) 3. Added "data size" for ShuffleExchange and BroadcastExchange 4. Added some timing for Sort, Aggregate and BroadcastExchange (this requires another patch to work) ## How was this patch tested? Existing tests. ![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png) Author: Davies Liu <davies@databricks.com> Closes #12425 from davies/fix_metrics.
1 parent 0419d63 commit 0dcf9db

File tree

10 files changed

+110
-32
lines changed

10 files changed

+110
-32
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
7575
private MemoryBlock currentPage = null;
7676
private long pageCursor = -1;
7777
private long peakMemoryUsedBytes = 0;
78+
private long totalSpillBytes = 0L;
7879
private volatile SpillableIterator readingIterator = null;
7980

8081
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -215,7 +216,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
215216
// we might not be able to get memory for the pointer array.
216217

217218
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
218-
219+
totalSpillBytes += spillSize;
219220
return spillSize;
220221
}
221222

@@ -246,6 +247,13 @@ public long getPeakMemoryUsedBytes() {
246247
return peakMemoryUsedBytes;
247248
}
248249

250+
/**
251+
* Return the total number of bytes that has been spilled into disk so far.
252+
*/
253+
public long getSpillSize() {
254+
return totalSpillBytes;
255+
}
256+
249257
@VisibleForTesting
250258
public int getNumberOfAllocatedPages() {
251259
return allocatedPages.size();
@@ -499,6 +507,8 @@ public long spill() throws IOException {
499507
released += inMemSorter.getMemoryUsage();
500508
inMemSorter.free();
501509
inMemSorter = null;
510+
taskContext.taskMetrics().incMemoryBytesSpilled(released);
511+
totalSpillBytes += released;
502512
return released;
503513
}
504514
}

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ public KVSorterIterator sortedIterator() throws IOException {
176176
}
177177
}
178178

179+
/**
180+
* Return the total number of bytes that has been spilled into disk so far.
181+
*/
182+
public long getSpillSize() {
183+
return sorter.getSpillSize();
184+
}
185+
179186
/**
180187
* Return the peak memory used so far, in bytes.
181188
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ case class Sort(
5353
private val enableRadixSort = sqlContext.conf.enableRadixSort
5454

5555
override private[sql] lazy val metrics = Map(
56-
"sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
57-
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
56+
"sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
57+
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
5858
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
5959

6060
def createSorter(): UnsafeExternalRowSorter = {
@@ -86,8 +86,9 @@ case class Sort(
8686
}
8787

8888
protected override def doExecute(): RDD[InternalRow] = {
89-
val dataSize = longMetric("dataSize")
89+
val peakMemory = longMetric("peakMemory")
9090
val spillSize = longMetric("spillSize")
91+
val sortTime = longMetric("sortTime")
9192

9293
child.execute().mapPartitionsInternal { iter =>
9394
val sorter = createSorter()
@@ -96,10 +97,12 @@ case class Sort(
9697
// Remember spill data size of this task before execute this operator so that we can
9798
// figure out how many bytes we spilled for this operator.
9899
val spillSizeBefore = metrics.memoryBytesSpilled
100+
val beforeSort = System.nanoTime()
99101

100102
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
101103

102-
dataSize += sorter.getPeakMemoryUsage
104+
sortTime += (System.nanoTime() - beforeSort) / 1000000
105+
peakMemory += sorter.getPeakMemoryUsage
103106
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
104107
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
105108

@@ -145,19 +148,19 @@ case class Sort(
145148
ctx.copyResult = false
146149

147150
val outputRow = ctx.freshName("outputRow")
148-
val dataSize = metricTerm(ctx, "dataSize")
151+
val peakMemory = metricTerm(ctx, "peakMemory")
149152
val spillSize = metricTerm(ctx, "spillSize")
150153
val spillSizeBefore = ctx.freshName("spillSizeBefore")
151154
val startTime = ctx.freshName("startTime")
152155
val sortTime = metricTerm(ctx, "sortTime")
153156
s"""
154157
| if ($needToSort) {
155-
| $addToSorter();
156158
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
157159
| long $startTime = System.nanoTime();
160+
| $addToSorter();
158161
| $sortedIterator = $sorterVariable.sort();
159-
| $sortTime.add(System.nanoTime() - $startTime);
160-
| $dataSize.add($sorterVariable.getPeakMemoryUsage());
162+
| $sortTime.add((System.nanoTime() - $startTime) / 1000000);
163+
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
161164
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
162165
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
163166
| $needToSort = false;

sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams
2626

2727
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
2828
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
29+
import org.apache.spark.sql.execution.metric.LongSQLMetric
2930
import org.apache.spark.unsafe.Platform
3031

3132
/**
@@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform
3940
*
4041
* @param numFields the number of fields in the row being serialized.
4142
*/
42-
private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable {
43-
override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields)
43+
private[sql] class UnsafeRowSerializer(
44+
numFields: Int,
45+
dataSize: LongSQLMetric = null) extends Serializer with Serializable {
46+
override def newInstance(): SerializerInstance =
47+
new UnsafeRowSerializerInstance(numFields, dataSize)
4448
override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
4549
}
4650

47-
private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance {
51+
private class UnsafeRowSerializerInstance(
52+
numFields: Int,
53+
dataSize: LongSQLMetric) extends SerializerInstance {
4854
/**
4955
* Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
5056
* length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
@@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
5460
private[this] val dOut: DataOutputStream =
5561
new DataOutputStream(new BufferedOutputStream(out))
5662

63+
// LongSQLMetricParam.add() is faster than LongSQLMetric.+=
64+
val localDataSize = if (dataSize != null) dataSize.localValue else null
65+
5766
override def writeValue[T: ClassTag](value: T): SerializationStream = {
5867
val row = value.asInstanceOf[UnsafeRow]
59-
68+
if (localDataSize != null) {
69+
localDataSize.add(row.getSizeInBytes)
70+
}
6071
dOut.writeInt(row.getSizeInBytes)
6172
row.writeToStream(dOut, writeBuffer)
6273
this

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
2626
import org.apache.spark.sql.catalyst.expressions.codegen._
2727
import org.apache.spark.sql.catalyst.plans.physical._
2828
import org.apache.spark.sql.execution._
29-
import org.apache.spark.sql.execution.metric.SQLMetrics
29+
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
3030
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
3131
import org.apache.spark.unsafe.KVIterator
3232

@@ -52,8 +52,9 @@ case class TungstenAggregate(
5252

5353
override private[sql] lazy val metrics = Map(
5454
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
55-
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
56-
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
55+
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
56+
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
57+
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
5758

5859
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
5960

@@ -83,7 +84,7 @@ case class TungstenAggregate(
8384

8485
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
8586
val numOutputRows = longMetric("numOutputRows")
86-
val dataSize = longMetric("dataSize")
87+
val peakMemory = longMetric("peakMemory")
8788
val spillSize = longMetric("spillSize")
8889

8990
child.execute().mapPartitions { iter =>
@@ -107,7 +108,7 @@ case class TungstenAggregate(
107108
iter,
108109
testFallbackStartsAt,
109110
numOutputRows,
110-
dataSize,
111+
peakMemory,
111112
spillSize)
112113
if (!hasInput && groupingExpressions.isEmpty) {
113114
numOutputRows += 1
@@ -212,10 +213,14 @@ case class TungstenAggregate(
212213
""".stripMargin)
213214

214215
val numOutput = metricTerm(ctx, "numOutputRows")
216+
val aggTime = metricTerm(ctx, "aggTime")
217+
val beforeAgg = ctx.freshName("beforeAgg")
215218
s"""
216219
| while (!$initAgg) {
217220
| $initAgg = true;
221+
| long $beforeAgg = System.nanoTime();
218222
| $doAgg();
223+
| $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
219224
|
220225
| // output the result
221226
| ${genResult.trim}
@@ -303,15 +308,17 @@ case class TungstenAggregate(
303308
*/
304309
def finishAggregate(
305310
hashMap: UnsafeFixedWidthAggregationMap,
306-
sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
311+
sorter: UnsafeKVExternalSorter,
312+
peakMemory: LongSQLMetricValue,
313+
spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {
307314

308315
// update peak execution memory
309316
val mapMemory = hashMap.getPeakMemoryUsedBytes
310317
val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
311-
val peakMemory = Math.max(mapMemory, sorterMemory)
318+
val maxMemory = Math.max(mapMemory, sorterMemory)
312319
val metrics = TaskContext.get().taskMetrics()
313-
metrics.incPeakExecutionMemory(peakMemory)
314-
// TODO: update data size and spill size
320+
peakMemory.add(maxMemory)
321+
metrics.incPeakExecutionMemory(maxMemory)
315322

316323
if (sorter == null) {
317324
// not spilled
@@ -365,6 +372,7 @@ case class TungstenAggregate(
365372

366373
true
367374
} else {
375+
spillSize.add(sorter.getSpillSize)
368376
false
369377
}
370378
}
@@ -476,6 +484,8 @@ case class TungstenAggregate(
476484
ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
477485

478486
val doAgg = ctx.freshName("doAggregateWithKeys")
487+
val peakMemory = metricTerm(ctx, "peakMemory")
488+
val spillSize = metricTerm(ctx, "spillSize")
479489
ctx.addNewFunction(doAgg,
480490
s"""
481491
${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
@@ -486,7 +496,7 @@ case class TungstenAggregate(
486496
${if (isVectorizedHashMapEnabled) {
487497
s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}
488498

489-
$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
499+
$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
490500
}
491501
""")
492502

@@ -528,10 +538,14 @@ case class TungstenAggregate(
528538
} else None
529539
}
530540

541+
val aggTime = metricTerm(ctx, "aggTime")
542+
val beforeAgg = ctx.freshName("beforeAgg")
531543
s"""
532544
if (!$initAgg) {
533545
$initAgg = true;
546+
long $beforeAgg = System.nanoTime();
534547
$doAgg();
548+
$aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
535549
}
536550

537551
// output the result

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class TungstenAggregationIterator(
8787
inputIter: Iterator[InternalRow],
8888
testFallbackStartsAt: Option[(Int, Int)],
8989
numOutputRows: LongSQLMetric,
90-
dataSize: LongSQLMetric,
90+
peakMemory: LongSQLMetric,
9191
spillSize: LongSQLMetric)
9292
extends AggregationIterator(
9393
groupingExpressions,
@@ -415,11 +415,11 @@ class TungstenAggregationIterator(
415415
if (!hasNext) {
416416
val mapMemory = hashMap.getPeakMemoryUsedBytes
417417
val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
418-
val peakMemory = Math.max(mapMemory, sorterMemory)
418+
val maxMemory = Math.max(mapMemory, sorterMemory)
419419
val metrics = TaskContext.get().taskMetrics()
420-
dataSize += peakMemory
420+
peakMemory += maxMemory
421421
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
422-
metrics.incPeakExecutionMemory(peakMemory)
422+
metrics.incPeakExecutionMemory(maxMemory)
423423
}
424424
numOutputRows += 1
425425
res

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import scala.concurrent.duration._
2323
import org.apache.spark.broadcast
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2627
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
2728
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
29+
import org.apache.spark.sql.execution.metric.SQLMetrics
2830
import org.apache.spark.util.ThreadUtils
2931

3032
/**
@@ -35,6 +37,12 @@ case class BroadcastExchange(
3537
mode: BroadcastMode,
3638
child: SparkPlan) extends Exchange {
3739

40+
override private[sql] lazy val metrics = Map(
41+
"dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
42+
"collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
43+
"buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
44+
"broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))
45+
3846
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
3947

4048
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -61,11 +69,21 @@ case class BroadcastExchange(
6169
// This will run in another thread. Set the execution id so that we can connect these jobs
6270
// with the correct execution.
6371
SQLExecution.withExecutionId(sparkContext, executionId) {
72+
val beforeCollect = System.nanoTime()
6473
// Note that we use .executeCollect() because we don't want to convert data to Scala types
6574
val input: Array[InternalRow] = child.executeCollect()
75+
val beforeBuild = System.nanoTime()
76+
longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
77+
longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
6678

6779
// Construct and broadcast the relation.
68-
sparkContext.broadcast(mode.transform(input))
80+
val relation = mode.transform(input)
81+
val beforeBroadcast = System.nanoTime()
82+
longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
83+
84+
val broadcasted = sparkContext.broadcast(relation)
85+
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
86+
broadcasted
6987
}
7088
}(BroadcastExchange.executionContext)
7189
}

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer
2525
import org.apache.spark.shuffle.sort.SortShuffleManager
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.catalyst.errors._
28-
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
2929
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3030
import org.apache.spark.sql.catalyst.plans.physical._
3131
import org.apache.spark.sql.execution._
32+
import org.apache.spark.sql.execution.metric.SQLMetrics
3233
import org.apache.spark.util.MutablePair
3334

3435
/**
@@ -39,6 +40,9 @@ case class ShuffleExchange(
3940
child: SparkPlan,
4041
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
4142

43+
override private[sql] lazy val metrics = Map(
44+
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
45+
4246
override def nodeName: String = {
4347
val extraInfo = coordinator match {
4448
case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
@@ -54,7 +58,8 @@ case class ShuffleExchange(
5458

5559
override def outputPartitioning: Partitioning = newPartitioning
5660

57-
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
61+
private val serializer: Serializer =
62+
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
5863

5964
override protected def doPrepare(): Unit = {
6065
// If an ExchangeCoordinator is needed, we register this Exchange operator

0 commit comments

Comments
 (0)