From 1f4a826a4dbadb6ca648b03f479aa22eaf87e309 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Dec 2022 17:53:20 +0800 Subject: [PATCH 1/2] SQLMetric should not expose -1 value as it's invalid --- .../apache/spark/sql/execution/metric/SQLMetrics.scala | 7 ++++++- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 9 +-------- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 6 +----- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index b31d0b9989d1c..6d2578c3576da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -78,7 +78,12 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato def +=(v: Long): Unit = add(v) - override def value: Long = _value + // We may use -1 as initial value of the accumulator, so that the SQL UI can filter out + // invalid accumulator values (0 is a valid metric value) when calculating min, max, etc. + // However, users can also access the SQL metrics values programmatically via this method. + // We should be consistent with the SQL UI and don't expose -1 to users. + // See `SQLMetrics.stringValue`. When there is no valid accumulator values, 0 is the metric value. + override def value: Long = if (_value < 0) 0 else _value // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1f10ff36acb4f..988695e2667b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2170,15 +2170,8 @@ class AdaptiveQueryExecSuite assert(aqeReads.length == 2) aqeReads.foreach { c => val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics - val rowCount = stats.rowCount.get + assert(stats.sizeInBytes >= 0) assert(stats.rowCount.get >= 0) - if (rowCount == 0) { - // For empty relation, the query stage doesn't serialize any bytes. - // The SQLMetric keeps initial value. - assert(stats.sizeInBytes == -1) - } else { - assert(stats.sizeInBytes > 0) - } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 424052df28933..76b5364164e45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -786,11 +786,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testMetricsInSparkPlanOperator(exchanges.head, Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100)) - // `testData2.filter($"b" === 0)` is an empty relation. - // The exchange doesn't serialize any bytes. - // The SQLMetric keeps initial value. - testMetricsInSparkPlanOperator(exchanges(1), - Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0)) + testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0)) } test("Add numRows to metric of BroadcastExchangeExec") { From aed4db416c8db2539e4a0ed9d3d05a61eb95202b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Dec 2022 21:15:35 +0800 Subject: [PATCH 2/2] fix tests --- .../org/apache/spark/sql/DynamicPartitionPruningSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f7b51db1c9020..ff78af7e636cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1715,7 +1715,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat val allFilesNum = scan1.metrics("numFiles").value val allFilesSize = scan1.metrics("filesSize").value assert(scan1.metrics("numPartitions").value === numPartitions) - assert(scan1.metrics("pruningTime").value === -1) + assert(scan1.metrics("pruningTime").value === 0) // No dynamic partition pruning, so no static metrics // Only files from fid = 5 partition are scanned @@ -1729,7 +1729,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat assert(0 < partFilesNum && partFilesNum < allFilesNum) assert(0 < partFilesSize && partFilesSize < allFilesSize) assert(scan2.metrics("numPartitions").value === 1) - assert(scan2.metrics("pruningTime").value === -1) + assert(scan2.metrics("pruningTime").value === 0) // Dynamic partition pruning is used // Static metrics are as-if reading the whole fact table