diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index c5cf885e013..5ffa83bb7f3 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -349,6 +349,10 @@ "message" : [ "Specified mode '' is not supported. Supported modes are: " ], "sqlState" : "0A000" }, + "DELTA_MULTIPLE_CDC_BOUNDARY" : { + "message" : [ "Multiple arguments provided for CDC read. Please provide one of either Timestamp or Version." ], + "sqlState" : "42000" + }, "DELTA_MULTIPLE_CONF_FOR_SINGLE_COLUMN_IN_BLOOM_FILTER" : { "message" : [ "Multiple bloom filter index configurations passed to command for column: " ], "sqlState" : "0A000" @@ -448,6 +452,10 @@ "message" : [ "The of your Delta table could not be recovered while Reconstructing", "version: . Did you manually delete files in the _delta_log directory?", "Set to \"false\"", "to skip validation." ], "sqlState" : "22000" }, + "DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS" : { + "message" : [ "Unable to enable Change Data Capture on the table. The table already contains", "reserved columns that will", "be used internally as metadata for the table's Change Data Feed. To enable", "Change Data Feed on the table rename/drop these columns.", "" ], + "sqlState" : "42000" + }, "DELTA_TABLE_ALREADY_EXISTS" : { "message" : [ "Table already exists." ], "sqlState" : "42000" @@ -586,5 +594,9 @@ "DELTA_VIOLATE_CONSTRAINT_WITH_VALUES" : { "message" : [ "CHECK constraint violated by row with values:", "" ], "sqlState" : "23001" + }, + "RESERVED_CDC_COLUMNS_ON_WRITE" : { + "message" : [ "", "The write contains reserved columns that are used", "internally as metadata for Change Data Feed. To write to the table either rename/drop", "these columns or disable Change Data Feed on the table by setting", " to false." ], + "sqlState" : "42000" } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 2cce6861c8d..3d838cb931a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TimeTravel import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException} import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2} import org.apache.spark.sql.delta.commands.RestoreTableCommand +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint} import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -403,8 +404,11 @@ object DeltaRelation extends DeltaLogging { options: CaseInsensitiveStringMap): LogicalRelation = { recordFrameProfile("DeltaAnalysis", "fromV2Relation") { val relation = d.withOptions(options.asScala.toMap).toBaseRelation - var output = v2Relation.output - + val output = if (CDCReader.isCDCRead(options)) { + CDCReader.cdcReadSchema(d.schema()).toAttributes + } else { + v2Relation.output + } val catalogTable = if (d.catalogTable.isDefined) { Some(d.v1Table) } else { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 0326771fc8c..564ed410243 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -176,6 +176,39 @@ object DeltaErrors } + /** + * Thrown when main table data contains columns that are reserved for CDF, such as `_change_type`. + */ + def cdcColumnsInData(columns: Seq[String]): Throwable = { + new DeltaIllegalStateException( + errorClass = "RESERVED_CDC_COLUMNS_ON_WRITE", + messageParameters = Array(columns.mkString("[", ",", "]"), DeltaConfigs.CHANGE_DATA_FEED.key) + ) + } + + /** + * Thrown when main table data already contains columns that are reserved for CDF, such as + * `_change_type`, but CDF is not yet enabled on that table. + */ + def tableAlreadyContainsCDCColumns(columns: Seq[String]): Throwable = { + new DeltaIllegalStateException(errorClass = "DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS", + messageParameters = Array(columns.mkString("[", ",", "]"))) + } + + /** + * Thrown when a CDC query contains conflict 'starting' or 'ending' options, e.g. when both + * starting version and starting timestamp are specified. + * + * @param position Specifies which option was duplicated in the read. Values are "starting" or + * "ending" + */ + def multipleCDCBoundaryException(position: String): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_MULTIPLE_CDC_BOUNDARY", + messageParameters = Array(position, position, position) + ) + } + def formatColumn(colName: String): String = s"`$colName`" def formatColumnList(colNames: Seq[String]): String = diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 131653a72fe..19f4d971320 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -31,6 +31,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.TagDefinitions._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.WriteIntoDelta +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} @@ -399,6 +400,15 @@ class DeltaLog private( // out in this case. throw DeltaErrors.pathNotExistsException(dataPath.toString) } + + // For CDC we have to return the relation that represents the change data instead of actual + // data. + if (!cdcOptions.isEmpty) { + recordDeltaEvent(this, "delta.cdf.read", data = cdcOptions.asCaseSensitiveMap()) + return CDCReader.getCDCRelation(spark, + this, snapshotToUse, partitionFilters, spark.sessionState.conf, cdcOptions) + } + val fileIndex = TahoeLogFileIndex( spark, this, dataPath, snapshotToUse, partitionFilters, isTimeTravelQuery) var bucketSpec: Option[BucketSpec] = None diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index ced8736801b..cdb9ed51c14 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -437,6 +437,7 @@ private[delta] object DeltaOperationMetrics { "rewriteTimeMs" // time taken to rewrite the matched files ) + val UPDATE = Set( "numAddedFiles", // number of files added "numRemovedFiles", // number of files removed diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index e2adccbbf9d..eb247c3dddc 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -187,6 +187,12 @@ object DeltaOptions extends DeltaLogging { val DATA_CHANGE_OPTION = "dataChange" val STARTING_VERSION_OPTION = "startingVersion" val STARTING_TIMESTAMP_OPTION = "startingTimestamp" + val CDC_START_VERSION = "startingVersion" + val CDC_START_TIMESTAMP = "startingTimestamp" + val CDC_END_VERSION = "endingVersion" + val CDC_END_TIMESTAMP = "endingTimestamp" + val CDC_READ_OPTION = "readChangeFeed" + val CDC_READ_OPTION_LEGACY = "readChangeData" val validOptionKeys : Set[String] = Set( REPLACE_WHERE_OPTION, @@ -203,6 +209,12 @@ object DeltaOptions extends DeltaLogging { DATA_CHANGE_OPTION, STARTING_TIMESTAMP_OPTION, STARTING_VERSION_OPTION, + CDC_READ_OPTION, + CDC_READ_OPTION_LEGACY, + CDC_START_TIMESTAMP, + CDC_END_TIMESTAMP, + CDC_START_VERSION, + CDC_END_VERSION, "queryName", "checkpointLocation", "path", diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 260aa3a7c7b..7d7ab66b18a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS import org.apache.spark.sql.delta.DeltaOperations.Operation import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files._ import org.apache.spark.sql.delta.hooks.{GenerateSymlinkManifest, PostCommitHook} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{Clock, Utils} /** Record metrics about a successful commit. */ @@ -539,6 +541,29 @@ trait OptimisticTransactionImpl extends TransactionalWrite } } + /** + * Checks if the new schema contains any CDC columns (which is invalid) and throws the appropriate + * error + */ + protected def performCdcMetadataCheck(): Unit = { + if (newMetadata.nonEmpty) { + if (CDCReader.isCDCEnabledOnTable(newMetadata.get)) { + val schema = newMetadata.get.schema.fieldNames + val reservedColumnsUsed = CDCReader.cdcReadSchema(new StructType()).fieldNames + .intersect(schema) + if (reservedColumnsUsed.length > 0) { + if (!CDCReader.isCDCEnabledOnTable(snapshot.metadata)) { + // cdc was not enabled previously but reserved columns are present in the new schema. + throw DeltaErrors.tableAlreadyContainsCDCColumns(reservedColumnsUsed) + } else { + // cdc was enabled but reserved columns are present in the new metadata. + throw DeltaErrors.cdcColumnsInData(reservedColumnsUsed) + } + } + } + } + } + /** * Modifies the state of the log by adding a new commit that is based on a read at * the given `lastVersion`. In the case of a conflict with a concurrent writer this @@ -554,6 +579,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitStartNano = System.nanoTime() val (version, actualCommittedActions) = try { + // Check for CDC metadata columns + performCdcMetadataCheck() + // Try to commit at the next version. val preparedActions = prepareCommit(actions, op) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 10534370ad7..5ba01cdd930 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaOptions, DeltaTableIdentifier, DeltaTableUtils, DeltaTimeTravelSpec, Snapshot} import org.apache.spark.sql.delta.commands.WriteIntoDelta +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils} import org.apache.hadoop.fs.Path @@ -186,8 +187,30 @@ case class DeltaTableV2( if (timeTravelOpt.nonEmpty && ttSpec.nonEmpty) { throw DeltaErrors.multipleTimeTravelSyntaxUsed } + + def checkCDCOptionsValidity(options: CaseInsensitiveStringMap): Unit = { + // check if we have both version and timestamp parameters + if (options.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY) + && options.containsKey(DeltaDataSource.CDC_START_VERSION_KEY)) { + throw DeltaErrors.multipleCDCBoundaryException("starting") + } + if (options.containsKey(DeltaDataSource.CDC_END_VERSION_KEY) + && options.containsKey(DeltaDataSource.CDC_END_TIMESTAMP_KEY)) { + throw DeltaErrors.multipleCDCBoundaryException("ending") + } + if (!options.containsKey(DeltaDataSource.CDC_START_VERSION_KEY) + && !options.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY)) { + throw DeltaErrors.noStartVersionForCDC() + } + } + + val caseInsensitiveStringMap = new CaseInsensitiveStringMap(options.asJava) + if (timeTravelOpt.isEmpty && ttSpec.nonEmpty) { copy(timeTravelOpt = ttSpec) + } else if (CDCReader.isCDCRead(caseInsensitiveStringMap)) { + checkCDCOptionsValidity(caseInsensitiveStringMap) + copy(cdcOptions = caseInsensitiveStringMap) } else { this } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index ecc13e10bd2..ed71e2ee42b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -27,6 +27,7 @@ import com.databricks.spark.util.DatabricksLogging import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.WriteIntoDelta +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.PartitionUtils import org.apache.hadoop.fs.Path @@ -94,7 +95,10 @@ class DeltaDataSource if (schemaToUse.isEmpty) { throw DeltaErrors.schemaNotSetException } - { + val options = new CaseInsensitiveStringMap(parameters.asJava) + if (CDCReader.isCDCRead(options)) { + (shortName(), CDCReader.cdcReadSchema(schemaToUse)) + } else { (shortName(), schemaToUse) } } @@ -171,7 +175,26 @@ class DeltaDataSource val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters) var cdcOptions: mutable.Map[String, String] = mutable.Map.empty - + val caseInsensitiveParams = new CaseInsensitiveStringMap(parameters.asJava) + if (CDCReader.isCDCRead(caseInsensitiveParams)) { + cdcOptions = mutable.Map[String, String](DeltaDataSource.CDC_ENABLED_KEY -> "true") + if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_START_VERSION_KEY)) { + cdcOptions(DeltaDataSource.CDC_START_VERSION_KEY) = caseInsensitiveParams.get( + DeltaDataSource.CDC_START_VERSION_KEY) + } + if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY)) { + cdcOptions(DeltaDataSource.CDC_START_TIMESTAMP_KEY) = caseInsensitiveParams.get( + DeltaDataSource.CDC_START_TIMESTAMP_KEY) + } + if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_END_VERSION_KEY)) { + cdcOptions(DeltaDataSource.CDC_END_VERSION_KEY) = caseInsensitiveParams.get( + DeltaDataSource.CDC_END_VERSION_KEY) + } + if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_END_TIMESTAMP_KEY)) { + cdcOptions(DeltaDataSource.CDC_END_TIMESTAMP_KEY) = caseInsensitiveParams.get( + DeltaDataSource.CDC_END_TIMESTAMP_KEY) + } + } val dfOptions: Map[String, String] = if (sqlContext.sparkSession.sessionState.conf.getConf( DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala new file mode 100644 index 00000000000..74aa96fcbea --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -0,0 +1,632 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File +import java.text.SimpleDateFormat +import java.util.Date + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.commands.cdc.CDCReader._ +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.FileNames + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.{col, current_timestamp, floor, lit} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StructType} + +abstract class DeltaCDCSuiteBase + extends QueryTest + with SharedSparkSession with CheckCDCAnswer + with DeltaSQLCommandTest { + + override protected def sparkConf: SparkConf = super.sparkConf + .set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true") + + /** Represents path or metastore table name */ + abstract case class TblId(id: String) + class TablePath(path: String) extends TblId(path) + class TableName(name: String) extends TblId(name) + + /** Indicates either the starting or ending version/timestamp */ + trait Boundary + case class StartingVersion(value: String) extends Boundary + case class StartingTimestamp(value: String) extends Boundary + case class EndingVersion(value: String) extends Boundary + case class EndingTimestamp(value: String) extends Boundary + case object Unbounded extends Boundary // used to model situation when a boundary isn't provided + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + + def createTblWithThreeVersions( + tblName: Option[String] = None, + path: Option[String] = None): Unit = { + // version 0 + if (tblName.isDefined && path.isDefined) { + spark.range(10).write.format("delta") + .option("path", path.get) + .saveAsTable(tblName.get) + } else if (tblName.isDefined) { + spark.range(10).write.format("delta") + .saveAsTable(tblName.get) + } else if (path.isDefined) { + spark.range(10).write.format("delta") + .save(path.get) + } + + if (tblName.isDefined) { + // version 1 + spark.range(10, 20).write.format("delta").mode("append").saveAsTable(tblName.get) + + // version 2 + spark.range(20, 30).write.format("delta").mode("append").saveAsTable(tblName.get) + } else if (path.isDefined) { + // version 1 + spark.range(10, 20).write.format("delta").mode("append").save(path.get) + + // version 2 + spark.range(20, 30).write.format("delta").mode("append").save(path.get) + } + } + + /** Single method to do all kinds of CDC reads */ + def cdcRead(tblId: TblId, start: Boundary, end: Boundary): DataFrame + + /** Modify timestamp for a delta commit, used to test timestamp querying */ + def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { + val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri) + file.setLastModified(time) + val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) + if (crc.exists()) { + crc.setLastModified(time) + } + } + + /** Create table utility method */ + def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = { + val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1")) + if (disableCDC) { + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "false") { + readDf.write.format("delta") + .saveAsTable(dstTbl) + } + } else { + readDf.write.format("delta") + .saveAsTable(dstTbl) + } + } + + testQuietly("writes with metadata columns") { + withTable("src", "dst") { + + // populate src table with CDC data + createTblWithThreeVersions(tblName = Some("src")) + + // writing cdc data to a new table with cdc enabled should fail. the source table has columns + // that are reserved for CDC only, and shouldn't be allowed into the target table. + val e = intercept[IllegalStateException] { + ctas("src", "dst") + } + val writeContainsCDCColumnsError = DeltaErrors.cdcColumnsInData( + cdcReadSchema(new StructType()).fieldNames).getMessage + val enablingCDCOnTableWithCDCColumns = DeltaErrors.tableAlreadyContainsCDCColumns( + cdcReadSchema(new StructType()).fieldNames).getMessage + + assert(e.getMessage.contains(writeContainsCDCColumnsError)) + + // when cdc is disabled writes should work + ctas("src", "dst", disableCDC = true) + + // write some more data + val moreData = spark.range(20, 30) + .withColumn(CDC_TYPE_COLUMN_NAME, lit("insert")) + .withColumn("_commit_version", lit(2L)) + .withColumn("_commit_timestamp", current_timestamp) + .cache() + + moreData.write.format("delta") + .mode("append") + .saveAsTable("dst") + + checkAnswer( + spark.read.format("delta").table("dst"), + cdcRead(new TableName("src"), StartingVersion("0"), EndingVersion("1")) + .union(moreData) + ) + + // re-enabling cdc should be disallowed, since the dst table already contains column that are + // reserved for CDC only. + val e2 = intercept[IllegalStateException] { + sql(s"ALTER TABLE dst SET TBLPROPERTIES " + + s"(${DeltaConfigs.CHANGE_DATA_FEED.key}=true)") + } + assert(e2.getMessage.contains(enablingCDCOnTableWithCDCColumns)) + } + } + + test("changes from table by name") { + withTable("tbl") { + createTblWithThreeVersions(tblName = Some("tbl")) + + val readDf = cdcRead(new TableName("tbl"), StartingVersion("0"), EndingVersion("1")) + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + ) + } + } + + test("changes from table by path") { + withTempDir { dir => + createTblWithThreeVersions(path = Some(dir.getAbsolutePath)) + + val readDf = cdcRead( + new TablePath(dir.getAbsolutePath), StartingVersion("0"), EndingVersion("1")) + checkCDCAnswer( + DeltaLog.forTable(spark, dir.getAbsolutePath), + readDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + ) + } + } + + test("changes - start and end are timestamps") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + // modify timestamps + // version 0 + modifyDeltaTimestamp(deltaLog, 0, 0) + val tsAfterV0 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(new Date(1)) + + // version 1 + modifyDeltaTimestamp(deltaLog, 1, 1000) + val tsAfterV1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(new Date(1001)) + + modifyDeltaTimestamp(deltaLog, 2, 2000) + + val readDf = cdcRead( + new TablePath(tempDir.getAbsolutePath), + StartingTimestamp(tsAfterV0), EndingTimestamp(tsAfterV1)) + checkCDCAnswer( + DeltaLog.forTable(spark, tempDir), + readDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("changes - only start is a timestamp") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 10000) + modifyDeltaTimestamp(deltaLog, 2, 20000) + + val ts0 = dateFormat.format(new Date(2000)) + val readDf = cdcRead( + new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingVersion("1")) + checkCDCAnswer( + DeltaLog.forTable(spark, tempDir), + readDf, + spark.range(10, 20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("changes - only start is a timestamp - inclusive behavior") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 1000) + modifyDeltaTimestamp(deltaLog, 2, 2000) + + val ts0 = dateFormat.format(new Date(0)) + val readDf = cdcRead( + new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingVersion("1")) + checkCDCAnswer( + DeltaLog.forTable(spark, tempDir), + readDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("start version and end version are the same") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + + val readDf = cdcRead( + new TableName(tblName), StartingVersion("0"), EndingVersion("0")) + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf, + spark.range(10) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("start version is provided and no end version") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + + val readDf = cdcRead( + new TableName(tblName), StartingVersion("0"), Unbounded) + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf, + spark.range(30) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("end timestamp < start timestamp") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 1000) + modifyDeltaTimestamp(deltaLog, 2, 2000) + + val ts0 = dateFormat.format(new Date(2000)) + val ts1 = dateFormat.format(new Date(1)) + val e = intercept[IllegalArgumentException] { + cdcRead( + new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingTimestamp(ts1)) + } + assert(e.getMessage.contains("End cannot be before start")) + } + } + + test("end version < start version") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + val e = intercept[IllegalArgumentException] { + cdcRead(new TableName(tblName), StartingVersion("1"), EndingVersion("0")) + } + assert(e.getMessage.contains("End cannot be before start")) + } + } + + test("cdc result dataframe can be transformed further") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + + val cdcResult = cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("1")) + val transformedDf = cdcResult + .drop(CDC_COMMIT_TIMESTAMP) + .withColumn("col3", lit(0)) + .withColumn("still_there", col("_change_type")) + + checkAnswer( + transformedDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + .withColumn("col3", lit(0)) + .withColumn("still_there", col("_change_type")) + ) + } + } + + test("multiple references on same table") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + + val cdcResult0_1 = cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("1")) + val cdcResult0_2 = cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("2")) + + val diff = cdcResult0_2.except(cdcResult0_1) + + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + diff, + spark.range(20, 30) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + ) + } + } + + test("filtering cdc metadata columns") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + val deltaTable = io.delta.tables.DeltaTable.forName("tbl") + deltaTable.delete("id > 20") + + val cdcResult = cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("3")) + + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + cdcResult.filter("_change_type != 'insert'"), + spark.range(21, 30) + .withColumn("_change_type", lit("delete")) + .withColumn("_commit_version", lit(3)) + ) + + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + cdcResult.filter("_commit_version = 1"), + spark.range(10, 20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", lit(1)) + ) + } + } + + test("aggregating non-numeric cdc data columns") { + withTempDir { dir => + val path = dir.getAbsolutePath + spark.range(10).selectExpr("id", "'text' as text") + .write.format("delta").save(path) + val deltaTable = io.delta.tables.DeltaTable.forPath(path) + deltaTable.delete("id > 5") + + val cdcResult = cdcRead(new TablePath(path), StartingVersion("0"), EndingVersion("3")) + + checkAnswer( + cdcResult.selectExpr("count(distinct text)"), + Row(1) + ) + + checkAnswer( + cdcResult.selectExpr("first(text)"), + Row("text") + ) + } + } + + test("ending version not specified resolves to latest at execution time") { + withTempDir { dir => + val path = dir.getAbsolutePath + spark.range(5).selectExpr("id", "'text' as text") + .write.format("delta").save(path) + val cdcResult = cdcRead(new TablePath(path), StartingVersion("0"), Unbounded) + + checkAnswer( + cdcResult.selectExpr("id", "_change_type", "_commit_version"), + Row(0, "insert", 0) :: Row(1, "insert", 0) :: Row(2, "insert", 0) :: + Row(3, "insert", 0):: Row(4, "insert", 0) :: Nil + ) + + // The next scan of `cdcResult` should include this delete even though the DF was defined + // before it. + val deltaTable = io.delta.tables.DeltaTable.forPath(path) + deltaTable.delete("id > 2") + + checkAnswer( + cdcResult.selectExpr("id", "_change_type", "_commit_version"), + Row(0, "insert", 0) :: Row(1, "insert", 0) :: Row(2, "insert", 0) :: + Row(3, "insert", 0):: Row(4, "insert", 0) :: + Row(3, "delete", 1):: Row(4, "delete", 1) :: Nil + ) + } + } + + test("table schema changed after dataframe with ending specified") { + withTempDir { dir => + val path = dir.getAbsolutePath + spark.range(5).selectExpr("id", "'text' as text") + .write.format("delta").save(path) + val cdcResult = cdcRead(new TablePath(path), StartingVersion("0"), EndingVersion("1")) + sql(s"ALTER TABLE delta.`$path` ADD COLUMN (newCol INT)") + + checkAnswer( + cdcResult.selectExpr("id", "_change_type", "_commit_version"), + Row(0, "insert", 0) :: Row(1, "insert", 0) :: Row(2, "insert", 0) :: + Row(3, "insert", 0) :: Row(4, "insert", 0) :: Nil + ) + } + } + + test("table schema changed after dataframe with ending not specified") { + withTempDir { dir => + val path = dir.getAbsolutePath + spark.range(5).selectExpr("id", "'text' as text") + .write.format("delta").save(path) + val cdcResult = cdcRead(new TablePath(path), StartingVersion("0"), Unbounded) + sql(s"ALTER TABLE delta.`$path` ADD COLUMN (newCol STRING)") + sql(s"INSERT INTO delta.`$path` VALUES (5, 'text', 'newColVal')") + + // Just ignoring the new column is pretty weird, but it's what we do for non-CDC dataframes, + // so we preserve the behavior rather than adding a special case. + checkAnswer( + cdcResult.selectExpr("id", "_change_type", "_commit_version"), + Row(0, "insert", 0) :: Row(1, "insert", 0) :: Row(2, "insert", 0) :: + Row(3, "insert", 0) :: Row(4, "insert", 0) :: Row(5, "insert", 2) :: Nil + ) + } + } + + test("An error should be thrown when CDC is not enabled") { + val tblName = "tbl" + withTable(tblName) { + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "false") { + // create version with cdc disabled - v0 + spark.range(10).write.format("delta").saveAsTable(tblName) + } + val deltaTable = io.delta.tables.DeltaTable.forName(tblName) + // v1 + deltaTable.delete("id > 8") + + // v2 + sql(s"ALTER TABLE ${tblName} SET TBLPROPERTIES " + + s"(${DeltaConfigs.CHANGE_DATA_FEED.key}=true)") + + // v3 + spark.range(10, 20).write.format("delta").mode("append").saveAsTable(tblName) + + // v4 + deltaTable.delete("id > 18") + + // v5 + sql(s"ALTER TABLE ${tblName} SET TBLPROPERTIES " + + s"(${DeltaConfigs.CHANGE_DATA_FEED.key}=false)") + + var e = intercept[AnalysisException] { + cdcRead(new TableName(tblName), StartingVersion("0"), EndingVersion("4")).collect() + } + assert(e.getMessage === DeltaErrors.changeDataNotRecordedException(0, 0, 4).getMessage) + + val cdcDf = cdcRead(new TableName(tblName), StartingVersion("2"), EndingVersion("4")) + assert(cdcDf.count() == 11) // 10 rows inserted, 1 row deleted + + // Check that we correctly detect CDC is disabled and fail the query for multiple types of + // ranges: + // * disabled at the end but not start - (2, 5) + // * disabled at the start but not end - (1, 4) + // * disabled at both start and end (even though enabled in the middle) - (1, 5) + for ((start, end, firstDisabledVersion) <- Seq((2, 5, 5), (1, 4, 1), (1, 5, 1))) { + e = intercept[AnalysisException] { + cdcRead( + new TableName(tblName), + StartingVersion(start.toString), EndingVersion(end.toString)).collect() + } + assert(e.getMessage === DeltaErrors.changeDataNotRecordedException( + firstDisabledVersion, start, end).getMessage) + } + } + } +} + +class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { + + /** Single method to do all kinds of CDC reads */ + def cdcRead( + tblId: TblId, + start: Boundary, + end: Boundary): DataFrame = { + + val startPrefix: (String, String) = start match { + case startingVersion: StartingVersion => + ("startingVersion", startingVersion.value) + + case startingTimestamp: StartingTimestamp => + ("startingTimestamp", startingTimestamp.value) + + case Unbounded => + ("", "") + } + val endPrefix: (String, String) = end match { + case endingVersion: EndingVersion => + ("endingVersion", endingVersion.value) + + case endingTimestamp: EndingTimestamp => + ("endingTimestamp", endingTimestamp.value) + + case Unbounded => + ("", "") + } + tblId match { + case path: TablePath => + spark.read.format("delta") + .option(DeltaOptions.CDC_READ_OPTION, "true") + .option(startPrefix._1, startPrefix._2) + .option(endPrefix._1, endPrefix._2) + .load(path.id) + + case tblName: TableName => + spark.read.format("delta") + .option(DeltaOptions.CDC_READ_OPTION, "true") + .option(startPrefix._1, startPrefix._2) + .option(endPrefix._1, endPrefix._2) + .table(tblName.id) + + case _ => + throw new IllegalArgumentException("No table name or path provided") + } + } + + + test("start version or timestamp is not provided") { + val tblName = "tbl" + withTable(tblName) { + createTblWithThreeVersions(tblName = Some(tblName)) + + val e = intercept[AnalysisException] { + spark.read.format("delta") + .option(DeltaOptions.CDC_READ_OPTION, "true") + .option("endingVersion", 1) + .table(tblName) + .show() + } + assert(e.getMessage.contains(DeltaErrors.noStartVersionForCDC().getMessage)) + } + } + + test("Not having readChangeFeed will not output cdc columns") { + val tblName = "tbl2" + withTable(tblName) { + spark.range(0, 10).write.format("delta").saveAsTable(tblName) + checkAnswer(spark.read.format("delta").table(tblName), spark.range(0, 10).toDF("id")) + + checkAnswer( + spark.read.format("delta") + .option("startingVersion", "0") + .option("endingVersion", "0") + .table(tblName), + spark.range(0, 10).toDF("id")) + } + } + + test("non-monotonic timestamps") { + withTempDir { dir => + val path = dir.getAbsolutePath + val deltaLog = DeltaLog.forTable(spark, path) + (0 to 3).foreach { i => + spark.range(i * 10, (i + 1) * 10).write.format("delta").mode("append").save(path) + val file = new File(FileNames.deltaFile(deltaLog.logPath, i).toUri) + file.setLastModified(300 - i) + } + + checkCDCAnswer( + deltaLog, + cdcRead(new TablePath(path), StartingVersion("0"), EndingVersion("3")), + spark.range(0, 40) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", floor(col("id") / 10))) + } + } + +} + diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 137b8d76711..9960d3cf172 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -145,6 +145,41 @@ trait DeltaErrorsSuiteBase test("test DeltaErrors OSS methods") { + { + val e = intercept[DeltaIllegalStateException] { + throw DeltaErrors.tableAlreadyContainsCDCColumns(Seq("col1", "col2")) + } + assert(e.getErrorClass == "DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS") + assert(e.getSqlState == "42000") + assert(e.getMessage == + s"""Unable to enable Change Data Capture on the table. The table already contains + |reserved columns [col1,col2] that will + |be used internally as metadata for the table's Change Data Feed. To enable + |Change Data Feed on the table rename/drop these columns. + |""".stripMargin) + } + { + val e = intercept[DeltaIllegalStateException] { + throw DeltaErrors.cdcColumnsInData(Seq("col1", "col2")) + } + assert(e.getErrorClass == "RESERVED_CDC_COLUMNS_ON_WRITE") + assert(e.getSqlState == "42000") + assert(e.getMessage == + s""" + |The write contains reserved columns [col1,col2] that are used + |internally as metadata for Change Data Feed. To write to the table either rename/drop + |these columns or disable Change Data Feed on the table by setting + |delta.enableChangeDataFeed to false.""".stripMargin) + } + { + val e = intercept[DeltaAnalysisException] { + throw DeltaErrors.multipleCDCBoundaryException("sample") + } + assert(e.getErrorClass == "DELTA_MULTIPLE_CDC_BOUNDARY") + assert(e.getSqlState == "42000") + assert(e.getMessage == "Multiple sample arguments provided for CDC read. Please provide " + + "one of either sampleTimestamp or sampleVersion.") + } { val e = intercept[DeltaIllegalStateException] { throw DeltaErrors.failOnCheckpoint(new Path("path-1"), new Path("path-2"))