From d527ee73ccfcddb5d390c482eef32d55f029fa20 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 25 Jun 2025 10:19:06 -0700 Subject: [PATCH 1/4] drop on full refresh --- .../spark/sql/pipelines/graph/DatasetManager.scala | 10 ++++++---- .../sql/pipelines/graph/MaterializeTablesSuite.scala | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index ced1cfd36cb7..ccc7571bd803 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -178,12 +178,14 @@ object DatasetManager extends Logging { } // Wipe the data if we need to - if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) { - context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}") + val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined + if (dropTable) { + catalog.dropTable(identifier) +// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") } // Alter the table if we need to - if (existingTableOpt.isDefined) { + if (existingTableOpt.isDefined && !dropTable) { val existingSchema = existingTableOpt.get.schema() val targetSchema = if (table.isStreamingTable && !isFullRefresh) { @@ -198,7 +200,7 @@ object DatasetManager extends Logging { } // Create the table if we need to - if (existingTableOpt.isEmpty) { + if (dropTable || existingTableOpt.isEmpty) { catalog.createTable( identifier, new TableInfo.Builder() diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index b1c6fe79c0e4..e501a9006ec2 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -434,8 +434,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table2 = catalog.loadTable(identifier) assert( - table2.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + table2.columns().toSet == CatalogV2Util + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + .toSet ) assert(table2.partitioning().toSeq == Seq(Expressions.identity("x"))) @@ -456,8 +457,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table3 = catalog.loadTable(identifier) assert( - table3.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + table3.columns().toSet == CatalogV2Util + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + .toSet ) assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) } From ad8078a69768b6f0ebd28f3f6b9a38e2ecceb0dc Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 25 Jun 2025 21:31:47 -0700 Subject: [PATCH 2/4] remove commented-out code --- .../org/apache/spark/sql/pipelines/graph/DatasetManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index ccc7571bd803..08123035c66a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -181,7 +181,6 @@ object DatasetManager extends Logging { val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined if (dropTable) { catalog.dropTable(identifier) -// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") } // Alter the table if we need to From c6280c480351a07d3659e67d4d5bd11bda700a81 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 2 Jul 2025 13:33:56 -0700 Subject: [PATCH 3/4] bring back column ordering in test assert --- .../spark/sql/pipelines/graph/MaterializeTablesSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index e501a9006ec2..1b0b0df19ffb 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -434,9 +434,8 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table2 = catalog.loadTable(identifier) assert( - table2.columns().toSet == CatalogV2Util + table2.columns() sameElements CatalogV2Util .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) - .toSet ) assert(table2.partitioning().toSeq == Seq(Expressions.identity("x"))) @@ -457,9 +456,8 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table3 = catalog.loadTable(identifier) assert( - table3.columns().toSet == CatalogV2Util + table3.columns() sameElements CatalogV2Util .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) - .toSet ) assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) } From 6b5d7b48f9e79f79c9d2a08a584344636e98bdc1 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 7 Jul 2025 08:21:13 -0700 Subject: [PATCH 4/4] switch back order --- .../spark/sql/pipelines/graph/MaterializeTablesSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index 1b0b0df19ffb..b1c6fe79c0e4 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -435,7 +435,7 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table2 = catalog.loadTable(identifier) assert( table2.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) ) assert(table2.partitioning().toSeq == Seq(Expressions.identity("x"))) @@ -456,8 +456,8 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table3 = catalog.loadTable(identifier) assert( - table3.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + table3.columns() sameElements CatalogV2Util + .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) ) assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) }