From 0441277f926cf847bffa7ea9532dfd92f1435919 Mon Sep 17 00:00:00 2001 From: lajin Date: Thu, 8 Apr 2021 10:53:58 +0800 Subject: [PATCH] [CARMEL-4848][CARMEL-4694][FOLLOWUP] Fix the metrics NoSuchElementException for DELETE op (#39) --- .../scala/org/apache/spark/sql/delta/DeltaOperations.scala | 6 ++++-- .../org/apache/spark/sql/delta/commands/DeleteCommand.scala | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 78485be5c0a..3dd72f5af8d 100644 --- a/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -381,14 +381,16 @@ private[delta] object DeltaOperationMetrics { "numRemovedFiles", // number of files removed "numDeletedRows", // number of rows removed "numCopiedRows", // number of rows copied in the process of deleting files - "numOutputRows" + "numOutputRows" // number of output rows ) /** Deleting the entire table or partition would prevent row level metrics from being recorded */ val DELETE_PARTITIONS = Set( "numAddedFiles", // number of files added "numRemovedFiles", // number of files removed, - "numDeletedRows" // number of rows removed + "numDeletedRows", // number of rows removed + "numCopiedRows", // number of rows copied in the process of deleting files + "numOutputRows" // number of output rows ) val TRUNCATE = Set( diff --git a/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index d9720021959..5f4209c548d 100644 --- a/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -58,7 +58,9 @@ case class DeleteCommand( override lazy val metrics = Map[String, SQLMetric]( "numRemovedFiles" -> createMetric(sc, "number of files removed."), "numAddedFiles" -> createMetric(sc, "number of files added."), - "numDeletedRows" -> createMetric(sc, "number of rows deleted.") + "numDeletedRows" -> createMetric(sc, "number of rows deleted."), + "numOutputRows" -> createMetric(sc, "number of output rows"), + "numCopiedRows" -> createMetric(sc, "number of copied rows") ) final override def run(sparkSession: SparkSession): Seq[Row] = {