diff --git a/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala index 16c1d7ef0c4..de5834f1ee5 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.delta import scala.collection.mutable import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.constraints.{Constraint, Constraints} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils @@ -113,6 +114,20 @@ object ColumnWithDefaultExprUtils extends DeltaLogging { SchemaUtils.fieldToColumn(f).alias(f.name) } } + val cdcSelectExprs = CDCReader.CDC_COLUMNS_IN_DATA.flatMap { cdcColumnName => + topLevelOutputNames.get(cdcColumnName).flatMap { cdcField => + if (metadataOutputNames.contains(cdcColumnName)) { + // The column is in the table schema. It's not a CDC auto generated column. Skip it since + // it's already in `selectExprs`. + None + } else { + // The column is not in the table schema, + // so it must be a column generated by CDC. Adding it back as it's not in `selectExprs`. + Some(SchemaUtils.fieldToColumn(cdcField).alias(cdcField.name)) + } + } + } + selectExprs = selectExprs ++ cdcSelectExprs val newData = queryExecution match { case incrementalExecution: IncrementalExecution => selectFromStreamingDataFrame(incrementalExecution, data, selectExprs: _*) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala index b495c1a26b6..e19d8f58ca8 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.io.PrintWriter +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.schema.{InvariantViolationException, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.{current_timestamp, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger} import org.apache.spark.sql.test.SharedSparkSession @@ -72,7 +74,8 @@ trait GeneratedColumnTest extends QueryTest with SharedSparkSession with DeltaSQ generatedColumns: Map[String, String], partitionColumns: Seq[String], notNullColumns: Set[String], - comments: Map[String, String]): DeltaTableBuilder = { + comments: Map[String, String], + properties: Map[String, String]): DeltaTableBuilder = { val schema = if (schemaString.nonEmpty) { StructType.fromDDL(schemaString) } else { @@ -102,6 +105,9 @@ trait GeneratedColumnTest extends QueryTest with SharedSparkSession with DeltaSQ if (path.nonEmpty) { builder.location(path.get) } + properties.foreach { case (key, value) => + builder.property(key, value) + } builder } @@ -112,10 +118,11 @@ trait GeneratedColumnTest extends QueryTest with SharedSparkSession with DeltaSQ generatedColumns: Map[String, String], partitionColumns: Seq[String], notNullColumns: Set[String] = Set.empty, - comments: Map[String, String] = Map.empty): Unit = { + comments: Map[String, String] = Map.empty, + properties: Map[String, String] = Map.empty): Unit = { var tableBuilder = io.delta.tables.DeltaTable.create(spark) buildTable(tableBuilder, tableName, path, schemaString, - generatedColumns, partitionColumns, notNullColumns, comments) + generatedColumns, partitionColumns, notNullColumns, comments, properties) .execute() } } @@ -133,6 +140,7 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest { partitionColumns: Seq[String], notNullColumns: Set[String] = Set.empty, comments: Map[String, String] = Map.empty, + properties: Map[String, String] = Map.empty, orCreate: Option[Boolean] = None): Unit = { var tableBuilder = if (orCreate.getOrElse(false)) { io.delta.tables.DeltaTable.createOrReplace(spark) @@ -140,7 +148,7 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest { io.delta.tables.DeltaTable.replace(spark) } buildTable(tableBuilder, tableName, path, schemaString, - generatedColumns, partitionColumns, notNullColumns, comments).execute() + generatedColumns, partitionColumns, notNullColumns, comments, properties).execute() } // Define the information for a default test table used by many tests. @@ -1594,6 +1602,76 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest { } } } + + test("generated columns with cdf") { + val tableName1 = "gcEnabledCDCOn" + val tableName2 = "gcEnabledCDCOff" + withTable(tableName1, tableName2) { + + createTable( + tableName1, + None, + schemaString = "id LONG, timeCol TIMESTAMP, dateCol DATE", + generatedColumns = Map( + "dateCol" -> "CAST(timeCol AS DATE)" + ), + partitionColumns = Seq("dateCol"), + properties = Map( + "delta.enableChangeDataFeed" -> "true" + ) + ) + + spark.range(100).repartition(10) + .withColumn("timeCol", current_timestamp()) + .write + .format("delta") + .mode("append") + .saveAsTable(tableName1) + + spark.sql(s"DELETE FROM ${tableName1} WHERE id < 3") + + val changeData = spark.read.format("delta").option("readChangeData", "true") + .option("startingVersion", "2") + .table(tableName1) + .select("id", CDCReader.CDC_TYPE_COLUMN_NAME, CDCReader.CDC_COMMIT_VERSION) + + val expected = spark.range(0, 3) + .withColumn(CDCReader.CDC_TYPE_COLUMN_NAME, lit("delete")) + .withColumn(CDCReader.CDC_COMMIT_VERSION, lit(2)) + checkAnswer(changeData, expected) + + // Now write out the data frame of cdc to another table that has generated columns but not + // cdc enabled. + createTable( + tableName2, + None, + schemaString = "id LONG, _change_type STRING, timeCol TIMESTAMP, dateCol DATE", + generatedColumns = Map( + "dateCol" -> "CAST(timeCol AS DATE)" + ), + partitionColumns = Seq("dateCol"), + properties = Map( + "delta.enableChangeDataFeed" -> "false" + ) + ) + + val cdcRead = spark.read.format("delta").option("readChangeData", "true") + .option("startingVersion", "2") + .table(tableName1) + .select("id", CDCReader.CDC_TYPE_COLUMN_NAME, "timeCol") + + cdcRead + .write + .format("delta") + .mode("append") + .saveAsTable(tableName2) + + checkAnswer( + cdcRead, + spark.table(tableName2).drop("dateCol") + ) + } + } } class GeneratedColumnSuite extends GeneratedColumnSuiteBase