Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,50 @@ import org.apache.spark.util.AccumulatorContext.internOption
* the executor side are automatically propagated and shown in the SQL UI through metrics. Updates
* on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
*/
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
// update it at the end of task and the value will be at least 0. Then we can filter out the -1
// values before calculate max, min, etc.
private[this] var _value = initValue
private var _zeroValue = initValue
class SQLMetric(
val metricType: String,
initValue: Long = 0L,
zeroValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. zeroValue defines the lowest value
// considered valid. If a SQLMetric is invalid, it is set to zeroValue upon receiving any
// updates, and it also reports zeroValue as its value to avoid exposing it to the user
// programatically.
//
// For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate that the metric is
// by default invalid. At the end of a task, we will update the metric making it valid, and the
// invalid metrics will be filtered out when calculating min, max, etc. as a workaround for
// SPARK-11013.
private var _value = initValue

override def copy(): SQLMetric = {
val newAcc = new SQLMetric(metricType, _value)
newAcc._zeroValue = initValue
val newAcc = new SQLMetric(metricType, initValue, zeroValue)
newAcc._value = _value
newAcc
}

override def reset(): Unit = _value = _zeroValue
override def reset(): Unit = _value = initValue

override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric =>
if (o.value > 0) {
if (_value < 0) _value = 0
if (o.isValid) {
if (!isValid) _value = zeroValue
_value += o.value
}
case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
this.getClass.getName, other.getClass.getName)
}

override def isZero: Boolean = _value == _zeroValue
// This is used to filter out metrics. Metrics with value equal to initValue should
// be filtered out, since they are either invalid or safe to filter without changing
// the aggregation defined in [[SQLMetrics.stringValue]].
// Note that we don't use zeroValue here since we may want to collect zeroValue metrics
// for calculating min, max, etc. See SPARK-11013.
override def isZero: Boolean = _value == initValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit tricky as isZero is not true when we actually have the zero value...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's enrich the comment to highlight that, we may want to collect the 0 value for calculating min/max/avg. We can still link to SPARK-11013.


def isValid: Boolean = _value >= zeroValue

override def add(v: Long): Unit = {
if (_value < 0) _value = 0
if (!isValid) _value = zeroValue
_value += v
}

Expand All @@ -78,12 +92,9 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato

def +=(v: Long): Unit = add(v)

// We may use -1 as initial value of the accumulator, so that the SQL UI can filter out
// invalid accumulator values (0 is a valid metric value) when calculating min, max, etc.
// However, users can also access the SQL metrics values programmatically via this method.
// We should be consistent with the SQL UI and don't expose -1 to users.
// See `SQLMetrics.stringValue`. When there is no valid accumulator values, 0 is the metric value.
override def value: Long = if (_value < 0) 0 else _value
// _value may be invalid, in many cases being -1. We should not expose it to the user
// and instead return zeroValue.
override def value: Long = if (!isValid) zeroValue else _value

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down