diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index c70ee637a2489..7f69657988a45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -145,6 +145,12 @@ case class BroadcastExchangeExec( Statistics(dataSize, Some(rowCount)) } + override def resetMetrics(): Unit = { + // no-op + // BroadcastExchangeExec after materialized won't be materialized again, so we should not + // reset the metrics. Otherwise, we will lose the metrics collected in the broadcast job. + } + @transient private lazy val promise = Promise[broadcast.Broadcast[Any]]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala index 60a74a553bc45..8d6ee83f5e6b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala @@ -98,6 +98,21 @@ class BroadcastExchangeSuite extends SparkPlanTest assert(joinDF.collect().length == 1) } } + + test("SPARK-52962: broadcast exchange should not reset metrics") { + val df = spark.range(1).toDF() + val joinDF = df.join(broadcast(df), "id") + joinDF.collect() + val broadcastExchangeExec = collect( + joinDF.queryExecution.executedPlan) { case p: BroadcastExchangeExec => p } + assert(broadcastExchangeExec.size == 1, "one and only BroadcastExchangeExec") + + val broadcastExchangeNode = broadcastExchangeExec.head + val metrics = broadcastExchangeNode.metrics + assert(metrics("numOutputRows").value == 1) + broadcastExchangeNode.resetMetrics() + assert(metrics("numOutputRows").value == 1) + } } // Additional tests run in 'local-cluster' mode.