From e2b8ae18b194a6739f6c5c115c38c01a5120f38a Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:32:35 -0700 Subject: [PATCH 01/11] merge numsourcerows metric --- .../v2/WriteToDataSourceV2Exec.scala | 49 +++++++++++++++++-- .../connector/MergeIntoTableSuiteBase.scala | 42 ++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 4904e3d60dc9c..f5029b91538f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -34,9 +34,9 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType @@ -481,9 +481,48 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = { - collectFirst(query) { case m: MergeRowsExec => m }.map{ n => - n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } - }.getOrElse(Map.empty[String, lang.Long]).asJava + val mergeMetrics = collectFirst(query) { case m: MergeRowsExec => m } + .map { n => + n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } + } + .getOrElse(Map.empty[String, lang.Long]) + + val numSourceRows = getNumSourceRows(query) + + (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava + } + + private def getNumSourceRows(query: SparkPlan): Long = { + collectFirst(query) { case m: MergeRowsExec => m } + .flatMap { mergeRowsExec => + val joinOpt = collectFirst(mergeRowsExec.child) { case j: BinaryExecNode => j } + + joinOpt.flatMap { join => + val leftIsTarget = isTargetTableScan(join.left) + val rightIsTarget = isTargetTableScan(join.right) + + val sourceChild = if (leftIsTarget) { + Some(join.right) + } else if (rightIsTarget) { + Some(join.left) + } else { + None + } + + sourceChild.flatMap { child => + collectFirst(child) { + case plan if plan.metrics.contains("numOutputRows") => plan + }.flatMap(_.metrics.get("numOutputRows").map(_.value)) + } + } + } + .getOrElse(-1L) + } + + private def isTargetTableScan(plan: SparkPlan): Boolean = { + collectFirst(plan) { + case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true + }.getOrElse(false) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 7d879e2c9a5ee..4553cfb789e64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1814,6 +1814,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) assert(commitProps("merge.numTargetRowsInserted") === "0") assert(commitProps("merge.numTargetRowsUpdated") === "1") @@ -1870,6 +1871,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") assert(commitProps("merge.numTargetRowsCopied") === "0") assert(commitProps("merge.numTargetRowsInserted") === "1") assert(commitProps("merge.numTargetRowsUpdated") === "0") @@ -1925,6 +1927,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) assert(commitProps("merge.numTargetRowsInserted") === "0") assert(commitProps("merge.numTargetRowsUpdated") === "2") @@ -1982,6 +1985,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) assert(commitProps("merge.numTargetRowsInserted") === "0") assert(commitProps("merge.numTargetRowsUpdated") === "0") @@ -2040,6 +2044,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "4") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) assert(commitProps("merge.numTargetRowsInserted") === "1") assert(commitProps("merge.numTargetRowsUpdated") === "2") @@ -2098,6 +2103,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "4") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) assert(commitProps("merge.numTargetRowsInserted") === "1") assert(commitProps("merge.numTargetRowsUpdated") === "0") @@ -2139,6 +2145,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "4") assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) assert(commitProps("merge.numTargetRowsInserted") === "1") assert(commitProps("merge.numTargetRowsUpdated") === "0") @@ -2154,6 +2161,41 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } + test("Merge metrics with numSourceRows for empty source") { + withTempView("source") { + createAndInitTable( + "pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) + + // Empty source + Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") + + sql(s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE THEN + | DELETE + |""".stripMargin) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numSourceRows") === "-1") // if no numSourceRows, should be -1 + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "3") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "3") + } + } + test("Merge schema evolution new column with set explicit column") { Seq((true, true), (false, true), (true, false)).foreach { case (withSchemaEvolution, schemaEvolutionEnabled) => From 3c08974cc0c91ca44b5881c88986a262b8c0dd5a Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:36:10 -0700 Subject: [PATCH 02/11] numsourcerows -1 --- .../v2/WriteToDataSourceV2Exec.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f5029b91538f4..bd21f57a6f6e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -481,15 +481,16 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = { - val mergeMetrics = collectFirst(query) { case m: MergeRowsExec => m } - .map { n => - n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } - } - .getOrElse(Map.empty[String, lang.Long]) - - val numSourceRows = getNumSourceRows(query) - - (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava + collectFirst(query) { case m: MergeRowsExec => m } match { + case Some(mergeRowsExec) => + val mergeMetrics = mergeRowsExec.metrics.map { + case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) + } + val numSourceRows = getNumSourceRows(query) + (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava + case None => + Map.empty[String, lang.Long].asJava + } } private def getNumSourceRows(query: SparkPlan): Long = { From 262b1d1835382d703f00a2f1e20a79e65c6f9f8b Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:39:51 -0700 Subject: [PATCH 03/11] comment --- .../apache/spark/sql/connector/MergeIntoTableSuiteBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 4553cfb789e64..b3a04cc9651ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -2184,7 +2184,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("numSourceRows") === "-1") // if no numSourceRows, should be -1 + assert(commitProps("numSourceRows") === "-1") // if no numOutputRows, should be -1 assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) assert(commitProps("merge.numTargetRowsInserted") === "0") assert(commitProps("merge.numTargetRowsUpdated") === "0") From 5bdb38529d458c829826eaceb408062bfd591753 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:39:51 -0700 Subject: [PATCH 04/11] comment --- .../apache/spark/sql/connector/MergeIntoTableSuiteBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index b3a04cc9651ad..4a0e9aee13728 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -2170,7 +2170,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase |{ "pk": 3, "salary": 300, "dep": "hr" } |""".stripMargin) - // Empty source + // source is empty Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") sql(s"""MERGE INTO $tableNameAsString t From 1746f828e022bef41394604ef0a0d16b5a7b46c6 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:43:24 -0700 Subject: [PATCH 05/11] nit --- .../v2/WriteToDataSourceV2Exec.scala | 17 +++++++---------- .../sql/connector/MergeIntoTableSuiteBase.scala | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index bd21f57a6f6e6..6f859b06e4bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -481,16 +481,13 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = { - collectFirst(query) { case m: MergeRowsExec => m } match { - case Some(mergeRowsExec) => - val mergeMetrics = mergeRowsExec.metrics.map { - case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) - } - val numSourceRows = getNumSourceRows(query) - (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava - case None => - Map.empty[String, lang.Long].asJava - } + val mergeMetrics = collectFirst(query) { case m: MergeRowsExec => m } + .map { n => + n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } + } + .getOrElse(Map.empty[String, lang.Long]) + val numSourceRows = getNumSourceRows(query) + (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava } private def getNumSourceRows(query: SparkPlan): Long = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 4a0e9aee13728..74146c9c7b83d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -2184,7 +2184,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("numSourceRows") === "-1") // if no numOutputRows, should be -1 + assert(commitProps("merge.numSourceRows") === "-1") // if no numOutputRows, should be -1 assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) assert(commitProps("merge.numTargetRowsInserted") === "0") assert(commitProps("merge.numTargetRowsUpdated") === "0") From 36b14dd2ae6e0d6c10c27ec8a6ded665cf8dcced Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:46:25 -0700 Subject: [PATCH 06/11] format --- .../datasources/v2/WriteToDataSourceV2Exec.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 6f859b06e4bba..9b46ee08a438d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -491,6 +491,12 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getNumSourceRows(query: SparkPlan): Long = { + def isTargetTableScan(plan: SparkPlan): Boolean = { + collectFirst(plan) { + case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true + }.getOrElse(false) + } + collectFirst(query) { case m: MergeRowsExec => m } .flatMap { mergeRowsExec => val joinOpt = collectFirst(mergeRowsExec.child) { case j: BinaryExecNode => j } @@ -516,12 +522,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } .getOrElse(-1L) } - - private def isTargetTableScan(plan: SparkPlan): Boolean = { - collectFirst(plan) { - case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true - }.getOrElse(false) - } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { From c4483796480293f4ff7f18c942138014a43de9da Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 09:51:37 -0700 Subject: [PATCH 07/11] fmt --- .../v2/WriteToDataSourceV2Exec.scala | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 9b46ee08a438d..06bffcf3d174d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -481,46 +481,45 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = { - val mergeMetrics = collectFirst(query) { case m: MergeRowsExec => m } - .map { n => - n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } - } - .getOrElse(Map.empty[String, lang.Long]) - val numSourceRows = getNumSourceRows(query) - (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava + collectFirst(query) { case m: MergeRowsExec => m } match { + case Some(mergeRowsExec) => + val mergeMetrics = mergeRowsExec.metrics.map { + case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) + } + val numSourceRows = getNumSourceRows(mergeRowsExec) + (mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava + case None => + Map.empty[String, lang.Long].asJava + } } - private def getNumSourceRows(query: SparkPlan): Long = { + private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = { def isTargetTableScan(plan: SparkPlan): Boolean = { collectFirst(plan) { case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true }.getOrElse(false) } - collectFirst(query) { case m: MergeRowsExec => m } - .flatMap { mergeRowsExec => - val joinOpt = collectFirst(mergeRowsExec.child) { case j: BinaryExecNode => j } - - joinOpt.flatMap { join => - val leftIsTarget = isTargetTableScan(join.left) - val rightIsTarget = isTargetTableScan(join.right) - - val sourceChild = if (leftIsTarget) { - Some(join.right) - } else if (rightIsTarget) { - Some(join.left) - } else { - None - } - - sourceChild.flatMap { child => - collectFirst(child) { - case plan if plan.metrics.contains("numOutputRows") => plan - }.flatMap(_.metrics.get("numOutputRows").map(_.value)) - } - } + val joinOpt = collectFirst(mergeRowsExec.child) { case j: BinaryExecNode => j } + + joinOpt.flatMap { join => + val leftIsTarget = isTargetTableScan(join.left) + val rightIsTarget = isTargetTableScan(join.right) + + val sourceChild = if (leftIsTarget) { + Some(join.right) + } else if (rightIsTarget) { + Some(join.left) + } else { + None + } + + sourceChild.flatMap { child => + collectFirst(child) { + case plan if plan.metrics.contains("numOutputRows") => plan + }.flatMap(_.metrics.get("numOutputRows").map(_.value)) } - .getOrElse(-1L) + }.getOrElse(-1L) } } From e1e14b8f4585e1c58210b9ce6ddfeb84406cebdb Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 20 Oct 2025 10:53:15 -0700 Subject: [PATCH 08/11] BaseJoinExec --- .../execution/datasources/v2/WriteToDataSourceV2Exec.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 06bffcf3d174d..549a7fc40711b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.BaseJoinExec import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} @@ -500,7 +501,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa }.getOrElse(false) } - val joinOpt = collectFirst(mergeRowsExec.child) { case j: BinaryExecNode => j } + val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j } joinOpt.flatMap { join => val leftIsTarget = isTargetTableScan(join.left) From 25760a074b664bb970318c951ec9e5cafa9948ff Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 27 Oct 2025 09:15:32 -0700 Subject: [PATCH 09/11] style, aqe tests --- .../v2/WriteToDataSourceV2Exec.scala | 35 +- .../connector/MergeIntoTableSuiteBase.scala | 465 ++++++++++-------- 2 files changed, 268 insertions(+), 232 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 549a7fc40711b..b812eeca49607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -495,32 +495,37 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = { - def isTargetTableScan(plan: SparkPlan): Boolean = { + def hasTargetTable(plan: SparkPlan): Boolean = { collectFirst(plan) { - case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true - }.getOrElse(false) + case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => scan + }.isDefined } - val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j } + def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = { + val leftHasTarget = hasTargetTable(join.left) + val rightHasTarget = hasTargetTable(join.right) - joinOpt.flatMap { join => - val leftIsTarget = isTargetTableScan(join.left) - val rightIsTarget = isTargetTableScan(join.right) - - val sourceChild = if (leftIsTarget) { + val sourceSide = if (leftHasTarget) { Some(join.right) - } else if (rightIsTarget) { + } else if (rightHasTarget) { Some(join.left) } else { None } - sourceChild.flatMap { child => - collectFirst(child) { - case plan if plan.metrics.contains("numOutputRows") => plan - }.flatMap(_.metrics.get("numOutputRows").map(_.value)) + sourceSide.flatMap { side => + collectFirst(side) { + case source if source.metrics.contains("numOutputRows") => + source + } } - }.getOrElse(-1L) + } + + (for { + join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j } + sourceScan <- findSourceScan(join) + metric <- sourceScan.metrics.get("numOutputRows") + } yield metric.value).getOrElse(-1L) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 74146c9c7b83d..26a6db6658154 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1777,52 +1777,58 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched clause") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 1) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 1) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"))) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"))) - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "3") - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) - assert(commitProps("merge.numTargetRowsInserted") === "0") - assert(commitProps("merge.numTargetRowsUpdated") === "1") - assert(commitProps("merge.numTargetRowsDeleted") === "0") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "1") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } @@ -1871,7 +1877,8 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val table = catalog.loadTable(ident) val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "3") + // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks + // assert(commitProps("merge.numSourceRows") === "3") assert(commitProps("merge.numTargetRowsCopied") === "0") assert(commitProps("merge.numTargetRowsInserted") === "1") assert(commitProps("merge.numTargetRowsUpdated") === "0") @@ -1884,58 +1891,64 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched and not matched by source clauses: update") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"))) // updated + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"))) // updated - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "3") - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) - assert(commitProps("merge.numTargetRowsInserted") === "0") - assert(commitProps("merge.numTargetRowsUpdated") === "2") - assert(commitProps("merge.numTargetRowsDeleted") === "0") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "3") + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "2") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } @@ -1998,120 +2011,132 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched, not matched, and not matched by source clauses: update") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"), // updated - Row(6, -1, "dummy"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"), // updated + Row(6, -1, "dummy"))) // inserted - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "4") - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) - assert(commitProps("merge.numTargetRowsInserted") === "1") - assert(commitProps("merge.numTargetRowsUpdated") === "2") - assert(commitProps("merge.numTargetRowsDeleted") === "0") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "4") + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "2") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } test("Merge metrics with matched, not matched, and not matched by source clauses: delete") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | DELETE - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | DELETE - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 2) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 2) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - // Row(1, 100, "hr") deleted - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - // Row(5, 500, "executive") deleted - Row(6, -1, "dummy"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + // Row(1, 100, "hr") deleted + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + // Row(5, 500, "executive") deleted + Row(6, -1, "dummy"))) // inserted - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "4") - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) - assert(commitProps("merge.numTargetRowsInserted") === "1") - assert(commitProps("merge.numTargetRowsUpdated") === "0") - assert(commitProps("merge.numTargetRowsDeleted") === "2") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1") + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "4") + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "2") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } @@ -2162,37 +2187,43 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with numSourceRows for empty source") { - withTempView("source") { - createAndInitTable( - "pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable( + "pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - // source is empty - Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") + // source is empty + Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") - sql(s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED BY SOURCE THEN - | DELETE - |""".stripMargin) + sql(s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE THEN + | DELETE + |""".stripMargin) - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "-1") // if no numOutputRows, should be -1 - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) - assert(commitProps("merge.numTargetRowsInserted") === "0") - assert(commitProps("merge.numTargetRowsUpdated") === "0") - assert(commitProps("merge.numTargetRowsDeleted") === "3") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "3") + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numSourceRows") === "-1") // if no numOutputRows, should be -1 + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "3") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "3") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } From 694410843b706fffc6dc58fa2015879cb9e91b99 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 27 Oct 2025 09:53:04 -0700 Subject: [PATCH 10/11] mergeSummary --- .../sql/connector/write/MergeSummary.java | 5 + .../connector/write/MergeSummaryImpl.scala | 1 + .../v2/WriteToDataSourceV2Exec.scala | 2 +- .../connector/MergeIntoTableSuiteBase.scala | 119 +++++++++--------- 4 files changed, 70 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java index a759e6693edb3..810258c0bf367 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java @@ -27,6 +27,11 @@ @Evolving public interface MergeSummary extends WriteSummary { + /** + * Returns the number of source rows. + */ + long numSourceRows(); + /** * Returns the number of target rows copied unmodified because they did not match any action. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala index 911749072c43c..f07f47061ee83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.connector.write * Implementation of [[MergeSummary]] that provides MERGE operation summary. */ private[sql] case class MergeSummaryImpl( + numSourceRows: Long, numTargetRowsCopied: Long, numTargetRowsDeleted: Long, numTargetRowsUpdated: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index ed80998be18fa..0591e2486dc39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -486,7 +486,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa val metrics = n.metrics val numSourceRows = getNumSourceRows(n) MergeSummaryImpl( - getNumSourceRows(n), + numSourceRows, metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index f6e967df7f851..147b91e02d202 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1814,19 +1814,21 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(1, 1000, "hr"), // updated Row(2, 200, "software"), Row(3, 300, "hr"))) + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 3L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 1L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 3L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 1L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) - sql(s"DROP TABLE $tableNameAsString") + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -1875,7 +1877,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase val mergeSummary = getMergeSummary() // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks - assert(commitProps("merge.numSourceRows") === (if (deltaMerge) 3L else -1L)) + assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L)) assert(mergeSummary.numTargetRowsCopied === 0L) assert(mergeSummary.numTargetRowsInserted === 1L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -1930,19 +1932,21 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(3, 300, "hr"), Row(4, 400, "marketing"), Row(5, -1, "executive"))) // updated + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 3L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 3L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) - sql(s"DROP TABLE $tableNameAsString") + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -2049,19 +2053,21 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), Row(5, -1, "executive"), // updated Row(6, -1, "dummy"))) // inserted + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 4L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 4L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) - sql(s"DROP TABLE $tableNameAsString") + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -2111,19 +2117,21 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), // Row(5, 500, "executive") deleted Row(6, -1, "dummy"))) // inserted + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 4L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 2L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 4L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 2L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) - sql(s"DROP TABLE $tableNameAsString") + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -2195,17 +2203,16 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase | DELETE |""".stripMargin) - val table = catalog.loadTable(ident) - val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties - assert(commitProps("merge.numSourceRows") === "-1") // if no numOutputRows, should be -1 - assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0")) - assert(commitProps("merge.numTargetRowsInserted") === "0") - assert(commitProps("merge.numTargetRowsUpdated") === "0") - assert(commitProps("merge.numTargetRowsDeleted") === "3") - assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") - assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") - assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "3") + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === -1L) // if no numOutputRows, should be -1 + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 0L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 3L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 3L) sql(s"DROP TABLE $tableNameAsString") } From 62a834082a06aef850ed58be5a52c54b1f46956a Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Mon, 27 Oct 2025 09:56:11 -0700 Subject: [PATCH 11/11] test --- .../connector/MergeIntoTableSuiteBase.scala | 100 +++++++++--------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 147b91e02d202..125115e2941f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1833,59 +1833,63 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched and not matched clause") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val sourceDF = Seq( - (4, 100, "marketing"), - (5, 400, "executive"), - (6, 100, "hr") - ).toDF("pk", "salary", "dep") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq( + (4, 100, "marketing"), + (5, 400, "executive"), + (6, 100, "hr") + ).toDF("pk", "salary", "dep") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN - | UPDATE SET salary = 9999 - |WHEN NOT MATCHED AND salary > 200 THEN - | INSERT * - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 9999 + |WHEN NOT MATCHED AND salary > 200 THEN + | INSERT * + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", 0) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", 0) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 100, "hr"), - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(5, 400, "executive"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(5, 400, "executive"))) // inserted - val mergeSummary = getMergeSummary() - // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks - assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L)) - assert(mergeSummary.numTargetRowsCopied === 0L) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks + assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L)) + assert(mergeSummary.numTargetRowsCopied === 0L) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + + sql(s"DROP TABLE $tableNameAsString") + } } }