-
Notifications
You must be signed in to change notification settings - Fork 285
fix(writer): spark 38811 insert alter table add columns #3479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ae0b3a2
bd5e713
790c6df
97ce443
788bcab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | |
| import org.apache.spark.internal.io.FileCommitProtocol | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} | ||
| import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||
| import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
|
@@ -63,7 +64,8 @@ case class CometNativeWriteExec( | |
| child: SparkPlan, | ||
| outputPath: String, | ||
| committer: Option[FileCommitProtocol] = None, | ||
| jobTrackerID: String = Utils.createTempDir().getName) | ||
| jobTrackerID: String = Utils.createTempDir().getName, | ||
| catalogTable: Option[CatalogTable] = None) | ||
| extends CometNativeExec | ||
| with UnaryExecNode { | ||
|
|
||
|
|
@@ -135,6 +137,11 @@ case class CometNativeWriteExec( | |
| } | ||
| } | ||
|
|
||
| // Refresh the catalog table cache so subsequent reads see the new data | ||
| catalogTable.foreach { ct => | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was different issue - while running the test, realised table needs to be refreshed to get the new data. |
||
| session.catalog.refreshTable(ct.identifier.quotedString) | ||
| } | ||
|
|
||
| // Return empty RDD as write operations don't return data | ||
| sparkContext.emptyRDD[InternalRow] | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -377,6 +377,167 @@ class CometParquetWriterSuite extends CometTestBase { | |
| } | ||
| } | ||
|
|
||
| private def withNativeWriteConf(f: => Unit): Unit = { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| f | ||
| } | ||
| } | ||
|
|
||
| private def assertCometNativeWrite(insertSql: String): Unit = { | ||
| val plan = captureSqlWritePlan(insertSql) | ||
| val hasNativeWrite = plan.exists { | ||
| case _: CometNativeWriteExec => true | ||
| case d: DataWritingCommandExec => | ||
| d.child.exists(_.isInstanceOf[CometNativeWriteExec]) | ||
| case _ => false | ||
| } | ||
| assert( | ||
| hasNativeWrite, | ||
| s"Expected CometNativeWriteExec in plan, but not found:\n${plan.treeString}") | ||
| } | ||
|
|
||
| private def captureSqlWritePlan(sqlText: String): SparkPlan = { | ||
| var capturedPlan: Option[QueryExecution] = None | ||
|
|
||
| val listener = new org.apache.spark.sql.util.QueryExecutionListener { | ||
| override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
| if (funcName == "command") { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can also use this directly ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This did not work since we require stripAQEPlan |
||
| capturedPlan = Some(qe) | ||
| } | ||
| } | ||
| override def onFailure( | ||
| funcName: String, | ||
| qe: QueryExecution, | ||
| exception: Exception): Unit = {} | ||
| } | ||
|
|
||
| spark.listenerManager.register(listener) | ||
| try { | ||
| sql(sqlText) | ||
| val maxWaitTimeMs = 5000 | ||
| val checkIntervalMs = 50 | ||
| var iterations = 0 | ||
| while (capturedPlan.isEmpty && iterations < maxWaitTimeMs / checkIntervalMs) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait for sometime to make sure query plan is completed. |
||
| Thread.sleep(checkIntervalMs) | ||
| iterations += 1 | ||
| } | ||
| assert(capturedPlan.isDefined, s"Failed to capture plan for: $sqlText") | ||
| stripAQEPlan(capturedPlan.get.executedPlan) | ||
| } finally { | ||
| spark.listenerManager.unregister(listener) | ||
| } | ||
| } | ||
|
|
||
| // SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests | ||
| // Mirrors the Spark InsertSuite test to validate Comet native writer compatibility. | ||
|
|
||
| test("SPARK-38811: simple default value with concat expression") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s string default concat('abc', 'def')") | ||
| assertCometNativeWrite("insert into t values(true, default)") | ||
| checkAnswer(spark.table("t"), Row(true, "abcdef")) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: multiple trailing default values") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i int) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| sql("alter table t add column x bigint default 43") | ||
| assertCometNativeWrite("insert into t(i) values(1)") | ||
| checkAnswer(spark.table("t"), Row(1, 42, 43)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: multiple trailing defaults via add columns") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i int) using parquet") | ||
| sql("alter table t add columns s bigint default 42, x bigint default 43") | ||
| assertCometNativeWrite("insert into t(i) values(1)") | ||
| checkAnswer(spark.table("t"), Row(1, 42, 43)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: default with nullable column (no default)") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i int) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| sql("alter table t add column x bigint") | ||
| assertCometNativeWrite("insert into t(i) values(1)") | ||
| checkAnswer(spark.table("t"), Row(1, 42, null)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: expression default (41 + 1)") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s bigint default 41 + 1") | ||
| assertCometNativeWrite("insert into t(i) values(default)") | ||
| checkAnswer(spark.table("t"), Row(null, 42)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: explicit defaults in multiple positions") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean default false) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| assertCometNativeWrite("insert into t values(false, default), (default, 42)") | ||
| checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: default with alias over VALUES") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| assertCometNativeWrite( | ||
| "insert into t select * from values (false, default) as tab(col, other)") | ||
| checkAnswer(spark.table("t"), Row(false, 42)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-38811: default value in wrong order evaluates to NULL") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| assertCometNativeWrite("insert into t values (default, 43)") | ||
| checkAnswer(spark.table("t"), Row(null, 43)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // INSERT INTO ... SELECT with native writer fails, | ||
| // open issue: https://github.com/apache/datafusion-comet/issues/3521 | ||
| ignore("SPARK-38811: default via SELECT statement") { | ||
| withNativeWriteConf { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s bigint default 42") | ||
| sql("insert into t select false, default") | ||
| checkAnswer(spark.table("t"), Row(false, 42)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def createTestData(inputDir: File): String = { | ||
| val inputPath = new File(inputDir, "input.parquet").getAbsolutePath | ||
| val schema = FuzzDataGenerator.generateSchema( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.