Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato

def +=(v: Long): Unit = add(v)

override def value: Long = _value
// We may use -1 as initial value of the accumulator, so that the SQL UI can filter out
// invalid accumulator values (0 is a valid metric value) when calculating min, max, etc.
// However, users can also access the SQL metrics values programmatically via this method.
// We should be consistent with the SQL UI and don't expose -1 to users.
// See `SQLMetrics.stringValue`. When there is no valid accumulator values, 0 is the metric value.
override def value: Long = if (_value < 0) 0 else _value

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
val allFilesNum = scan1.metrics("numFiles").value
val allFilesSize = scan1.metrics("filesSize").value
assert(scan1.metrics("numPartitions").value === numPartitions)
assert(scan1.metrics("pruningTime").value === -1)
assert(scan1.metrics("pruningTime").value === 0)

// No dynamic partition pruning, so no static metrics
// Only files from fid = 5 partition are scanned
Expand All @@ -1729,7 +1729,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
assert(0 < partFilesNum && partFilesNum < allFilesNum)
assert(0 < partFilesSize && partFilesSize < allFilesSize)
assert(scan2.metrics("numPartitions").value === 1)
assert(scan2.metrics("pruningTime").value === -1)
assert(scan2.metrics("pruningTime").value === 0)

// Dynamic partition pruning is used
// Static metrics are as-if reading the whole fact table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2170,15 +2170,8 @@ class AdaptiveQueryExecSuite
assert(aqeReads.length == 2)
aqeReads.foreach { c =>
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
val rowCount = stats.rowCount.get
assert(stats.sizeInBytes >= 0)
assert(stats.rowCount.get >= 0)
if (rowCount == 0) {
// For empty relation, the query stage doesn't serialize any bytes.
// The SQLMetric keeps initial value.
assert(stats.sizeInBytes == -1)
} else {
assert(stats.sizeInBytes > 0)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,11 +786,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

testMetricsInSparkPlanOperator(exchanges.head,
Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
// `testData2.filter($"b" === 0)` is an empty relation.
// The exchange doesn't serialize any bytes.
// The SQLMetric keeps initial value.
testMetricsInSparkPlanOperator(exchanges(1),
Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0))
testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
}

test("Add numRows to metric of BroadcastExchangeExec") {
Expand Down