From 61029fadfce4e6a3903be448d6144aff28e4d342 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 7 Dec 2022 12:49:31 -0800 Subject: [PATCH 1/2] Only update SQLMetric value if merging with valid metric --- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index a613a39b2ba89..b31d0b9989d1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -55,8 +55,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { case o: SQLMetric => - if (_value < 0) _value = 0 - if (o.value > 0) _value += o.value + if (o.value > 0) { + if (_value < 0) _value = 0 + _value += o.value + } case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError( this.getClass.getName, other.getClass.getName) } From 676456ce16ca53fc8765c9969767ae14f79c42b6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 7 Dec 2022 18:09:12 -0800 Subject: [PATCH 2/2] Fix tests --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 9 ++++++++- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 6 +++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 81bce35a58451..88baf76ba7a1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2156,8 +2156,15 @@ class AdaptiveQueryExecSuite assert(aqeReads.length == 2) aqeReads.foreach { c => val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics - assert(stats.sizeInBytes >= 0) + val rowCount = stats.rowCount.get assert(stats.rowCount.get >= 0) + if (rowCount == 0) { + // For empty relation, the query stage doesn't serialize any bytes. + // The SQLMetric keeps initial value. + assert(stats.sizeInBytes == -1) + } else { + assert(stats.sizeInBytes > 0) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 07b35713fe5b1..1f20fb62d3782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -785,7 +785,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testMetricsInSparkPlanOperator(exchanges.head, Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100)) - testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0)) + // `testData2.filter($"b" === 0)` is an empty relation. + // The exchange doesn't serialize any bytes. + // The SQLMetric keeps initial value. + testMetricsInSparkPlanOperator(exchanges(1), + Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0)) } test("Add numRows to metric of BroadcastExchangeExec") {