diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 6978ec3a8571..45954f21c592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -159,7 +159,6 @@ class SQLAppStatusListener( } private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = { - val metricIds = exec.metrics.map(_.accumulatorId).sorted val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap val metrics = exec.stages.toSeq .flatMap { stageId => Option(stageMetrics.get(stageId)) } @@ -167,10 +166,10 @@ class SQLAppStatusListener( .flatMap { metrics => metrics.ids.zip(metrics.values) } val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq) - .filter { case (id, _) => metricIds.contains(id) } + .filter { case (id, _) => metricTypes.contains(id) } .groupBy(_._1) .map { case (id, values) => - id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2)) } // Check the execution again for whether the aggregated metrics data has been calculated.