diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index b31d0b9989d1c..6d2578c3576da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -78,7 +78,12 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato def +=(v: Long): Unit = add(v) - override def value: Long = _value + // We may use -1 as initial value of the accumulator, so that the SQL UI can filter out + // invalid accumulator values (0 is a valid metric value) when calculating min, max, etc. + // However, users can also access the SQL metrics values programmatically via this method. + // We should be consistent with the SQL UI and don't expose -1 to users. + // See `SQLMetrics.stringValue`. When there is no valid accumulator values, 0 is the metric value. + override def value: Long = if (_value < 0) 0 else _value // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f7b51db1c9020..ff78af7e636cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1715,7 +1715,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat val allFilesNum = scan1.metrics("numFiles").value val allFilesSize = scan1.metrics("filesSize").value assert(scan1.metrics("numPartitions").value === numPartitions) - assert(scan1.metrics("pruningTime").value === -1) + assert(scan1.metrics("pruningTime").value === 0) // No dynamic partition pruning, so no static metrics // Only files from fid = 5 partition are scanned @@ -1729,7 +1729,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat assert(0 < partFilesNum && partFilesNum < allFilesNum) assert(0 < partFilesSize && partFilesSize < allFilesSize) assert(scan2.metrics("numPartitions").value === 1) - assert(scan2.metrics("pruningTime").value === -1) + assert(scan2.metrics("pruningTime").value === 0) // Dynamic partition pruning is used // Static metrics are as-if reading the whole fact table diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1f10ff36acb4f..988695e2667b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2170,15 +2170,8 @@ class AdaptiveQueryExecSuite assert(aqeReads.length == 2) aqeReads.foreach { c => val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics - val rowCount = stats.rowCount.get + assert(stats.sizeInBytes >= 0) assert(stats.rowCount.get >= 0) - if (rowCount == 0) { - // For empty relation, the query stage doesn't serialize any bytes. - // The SQLMetric keeps initial value. - assert(stats.sizeInBytes == -1) - } else { - assert(stats.sizeInBytes > 0) - } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 424052df28933..76b5364164e45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -786,11 +786,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testMetricsInSparkPlanOperator(exchanges.head, Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100)) - // `testData2.filter($"b" === 0)` is an empty relation. - // The exchange doesn't serialize any bytes. - // The SQLMetric keeps initial value. - testMetricsInSparkPlanOperator(exchanges(1), - Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0)) + testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0)) } test("Add numRows to metric of BroadcastExchangeExec") {