diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java index a759e6693edb3..810258c0bf367 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java @@ -27,6 +27,11 @@ @Evolving public interface MergeSummary extends WriteSummary { + /** + * Returns the number of source rows. + */ + long numSourceRows(); + /** * Returns the number of target rows copied unmodified because they did not match any action. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala index 911749072c43c..f07f47061ee83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.connector.write * Implementation of [[MergeSummary]] that provides MERGE operation summary. */ private[sql] case class MergeSummaryImpl( + numSourceRows: Long, numTargetRowsCopied: Long, numTargetRowsDeleted: Long, numTargetRowsUpdated: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 2a3a3441accc8..0591e2486dc39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.BaseJoinExec import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} @@ -483,7 +484,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = { collectFirst(query) { case m: MergeRowsExec => m }.map { n => val metrics = n.metrics + val numSourceRows = getNumSourceRows(n) MergeSummaryImpl( + numSourceRows, metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L), @@ -495,6 +498,40 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa ) } } + + private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = { + def hasTargetTable(plan: SparkPlan): Boolean = { + collectFirst(plan) { + case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => scan + }.isDefined + } + + def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = { + val leftHasTarget = hasTargetTable(join.left) + val rightHasTarget = hasTargetTable(join.right) + + val sourceSide = if (leftHasTarget) { + Some(join.right) + } else if (rightHasTarget) { + Some(join.left) + } else { + None + } + + sourceSide.flatMap { side => + collectFirst(side) { + case source if source.metrics.contains("numOutputRows") => + source + } + } + } + + (for { + join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j } + sourceScan <- findSourceScan(join) + metric <- sourceScan.metrics.get("numOutputRows") + } yield metric.value).getOrElse(-1L) + } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 2e175951851a1..125115e2941f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1778,159 +1778,179 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched clause") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 1) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 1) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"))) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"))) + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 1L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 3L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 1L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + + sql(s"DROP TABLE $tableNameAsString") + } } } test("Merge metrics with matched and not matched clause") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val sourceDF = Seq( - (4, 100, "marketing"), - (5, 400, "executive"), - (6, 100, "hr") - ).toDF("pk", "salary", "dep") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq( + (4, 100, "marketing"), + (5, 400, "executive"), + (6, 100, "hr") + ).toDF("pk", "salary", "dep") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN - | UPDATE SET salary = 9999 - |WHEN NOT MATCHED AND salary > 200 THEN - | INSERT * - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 9999 + |WHEN NOT MATCHED AND salary > 200 THEN + | INSERT * + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", 0) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", 0) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 100, "hr"), - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(5, 400, "executive"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(5, 400, "executive"))) // inserted + + val mergeSummary = getMergeSummary() + // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks + assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L)) + assert(mergeSummary.numTargetRowsCopied === 0L) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) - val mergeSummary = getMergeSummary() - assert(mergeSummary.numTargetRowsCopied === 0L) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + sql(s"DROP TABLE $tableNameAsString") + } } } test("Merge metrics with matched and not matched by source clauses: update") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"))) // updated + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"))) // updated + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 3L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -1979,6 +1999,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase ) val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 3L) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -1991,116 +2012,130 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched, not matched, and not matched by source clauses: update") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"), // updated - Row(6, -1, "dummy"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"), // updated + Row(6, -1, "dummy"))) // inserted + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 4L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + + sql(s"DROP TABLE $tableNameAsString") + } } } test("Merge metrics with matched, not matched, and not matched by source clauses: delete") { - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | DELETE - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | DELETE - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 2) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 2) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - // Row(1, 100, "hr") deleted - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - // Row(5, 500, "executive") deleted - Row(6, -1, "dummy"))) // inserted + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + // Row(1, 100, "hr") deleted + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + // Row(5, 500, "executive") deleted + Row(6, -1, "dummy"))) // inserted + } - val mergeSummary = getMergeSummary() - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 2L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === 4L) + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 2L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) + + sql(s"DROP TABLE $tableNameAsString") + } } } @@ -2133,6 +2168,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase ) val mergeMetrics = getMergeSummary() + assert(mergeMetrics.numSourceRows === 4L) assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeMetrics.numTargetRowsInserted === 1L) assert(mergeMetrics.numTargetRowsUpdated === 0L) @@ -2148,6 +2184,46 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } + test("Merge metrics with numSourceRows for empty source") { + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable( + "pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) + + // source is empty + Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") + + sql(s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE THEN + | DELETE + |""".stripMargin) + + val mergeSummary = getMergeSummary() + assert(mergeSummary.numSourceRows === -1L) // if no numOutputRows, should be -1 + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 0L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 3L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 3L) + + sql(s"DROP TABLE $tableNameAsString") + } + } + } + } + test("Merge schema evolution new column with set explicit column") { Seq((true, true), (false, true), (true, false)).foreach { case (withSchemaEvolution, schemaEvolutionEnabled) =>