Skip to content

Commit

Permalink
Change Data Feed - PR 3 - DataFrame API
Browse files Browse the repository at this point in the history
See the project plan at delta-io#1105.

This PR adds the DataFrame API for CDF as well as a new test suite to test this API. This API includes options

"startingVersion"
"startingTimestamp"
"endingVersion"
"endingTimestamp"
"readChangeFeed"
Misc. other CDF improvements, too, like extra schema checks during OptTxn write and returning a CDF relation in the DeltaLog::createRelation method.

Closes delta-io#1132

GitOrigin-RevId: 7ffafc6772fc314064971d65d9e7946b7a01de64

GitOrigin-RevId: b901d21804fe7aaecd6bb2e03cb33c76e19ae2ad
  • Loading branch information
scottsand-db authored and jbguerraz committed Jul 6, 2022
1 parent 95b3f82 commit d10781c
Show file tree
Hide file tree
Showing 11 changed files with 817 additions and 4 deletions.
12 changes: 12 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@
"message" : [ "Specified mode '<mode>' is not supported. Supported modes are: <supportedModes>" ],
"sqlState" : "0A000"
},
"DELTA_MULTIPLE_CDC_BOUNDARY" : {
"message" : [ "Multiple <startingOrEnding> arguments provided for CDC read. Please provide one of either <startingOrEnding>Timestamp or <startingOrEnding>Version." ],
"sqlState" : "42000"
},
"DELTA_MULTIPLE_CONF_FOR_SINGLE_COLUMN_IN_BLOOM_FILTER" : {
"message" : [ "Multiple bloom filter index configurations passed to command for column: <columnName>" ],
"sqlState" : "0A000"
Expand Down Expand Up @@ -448,6 +452,10 @@
"message" : [ "The <operation> of your Delta table could not be recovered while Reconstructing", "version: <version>. Did you manually delete files in the _delta_log directory?", "Set <config> to \"false\"", "to skip validation." ],
"sqlState" : "22000"
},
"DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS" : {
"message" : [ "Unable to enable Change Data Capture on the table. The table already contains", "reserved columns <columnList> that will", "be used internally as metadata for the table's Change Data Feed. To enable", "Change Data Feed on the table rename/drop these columns.", "" ],
"sqlState" : "42000"
},
"DELTA_TABLE_ALREADY_EXISTS" : {
"message" : [ "Table <tableName> already exists." ],
"sqlState" : "42000"
Expand Down Expand Up @@ -586,5 +594,9 @@
"DELTA_VIOLATE_CONSTRAINT_WITH_VALUES" : {
"message" : [ "CHECK constraint <constraintName> <expression> violated by row with values:", "<values>" ],
"sqlState" : "23001"
},
"RESERVED_CDC_COLUMNS_ON_WRITE" : {
"message" : [ "", "The write contains reserved columns <columnList> that are used", "internally as metadata for Change Data Feed. To write to the table either rename/drop", "these columns or disable Change Data Feed on the table by setting", "<config> to false." ],
"sqlState" : "42000"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException}
import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2}
import org.apache.spark.sql.delta.commands.RestoreTableCommand
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -403,8 +404,11 @@ object DeltaRelation extends DeltaLogging {
options: CaseInsensitiveStringMap): LogicalRelation = {
recordFrameProfile("DeltaAnalysis", "fromV2Relation") {
val relation = d.withOptions(options.asScala.toMap).toBaseRelation
var output = v2Relation.output

val output = if (CDCReader.isCDCRead(options)) {
CDCReader.cdcReadSchema(d.schema()).toAttributes
} else {
v2Relation.output
}
val catalogTable = if (d.catalogTable.isDefined) {
Some(d.v1Table)
} else {
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,39 @@ object DeltaErrors
}


/**
* Thrown when main table data contains columns that are reserved for CDF, such as `_change_type`.
*/
def cdcColumnsInData(columns: Seq[String]): Throwable = {
new DeltaIllegalStateException(
errorClass = "RESERVED_CDC_COLUMNS_ON_WRITE",
messageParameters = Array(columns.mkString("[", ",", "]"), DeltaConfigs.CHANGE_DATA_FEED.key)
)
}

/**
* Thrown when main table data already contains columns that are reserved for CDF, such as
* `_change_type`, but CDF is not yet enabled on that table.
*/
def tableAlreadyContainsCDCColumns(columns: Seq[String]): Throwable = {
new DeltaIllegalStateException(errorClass = "DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS",
messageParameters = Array(columns.mkString("[", ",", "]")))
}

/**
* Thrown when a CDC query contains conflict 'starting' or 'ending' options, e.g. when both
* starting version and starting timestamp are specified.
*
* @param position Specifies which option was duplicated in the read. Values are "starting" or
* "ending"
*/
def multipleCDCBoundaryException(position: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_MULTIPLE_CDC_BOUNDARY",
messageParameters = Array(position, position, position)
)
}

def formatColumn(colName: String): String = s"`$colName`"

def formatColumnList(colNames: Seq[String]): String =
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.control.NonFatal
import com.databricks.spark.util.TagDefinitions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
Expand Down Expand Up @@ -399,6 +400,15 @@ class DeltaLog private(
// out in this case.
throw DeltaErrors.pathNotExistsException(dataPath.toString)
}

// For CDC we have to return the relation that represents the change data instead of actual
// data.
if (!cdcOptions.isEmpty) {
recordDeltaEvent(this, "delta.cdf.read", data = cdcOptions.asCaseSensitiveMap())
return CDCReader.getCDCRelation(spark,
this, snapshotToUse, partitionFilters, spark.sessionState.conf, cdcOptions)
}

val fileIndex = TahoeLogFileIndex(
spark, this, dataPath, snapshotToUse, partitionFilters, isTimeTravelQuery)
var bucketSpec: Option[BucketSpec] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ private[delta] object DeltaOperationMetrics {
"rewriteTimeMs" // time taken to rewrite the matched files
)


val UPDATE = Set(
"numAddedFiles", // number of files added
"numRemovedFiles", // number of files removed
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ object DeltaOptions extends DeltaLogging {
val DATA_CHANGE_OPTION = "dataChange"
val STARTING_VERSION_OPTION = "startingVersion"
val STARTING_TIMESTAMP_OPTION = "startingTimestamp"
val CDC_START_VERSION = "startingVersion"
val CDC_START_TIMESTAMP = "startingTimestamp"
val CDC_END_VERSION = "endingVersion"
val CDC_END_TIMESTAMP = "endingTimestamp"
val CDC_READ_OPTION = "readChangeFeed"
val CDC_READ_OPTION_LEGACY = "readChangeData"

val validOptionKeys : Set[String] = Set(
REPLACE_WHERE_OPTION,
Expand All @@ -203,6 +209,12 @@ object DeltaOptions extends DeltaLogging {
DATA_CHANGE_OPTION,
STARTING_TIMESTAMP_OPTION,
STARTING_VERSION_OPTION,
CDC_READ_OPTION,
CDC_READ_OPTION_LEGACY,
CDC_START_TIMESTAMP,
CDC_END_TIMESTAMP,
CDC_START_VERSION,
CDC_END_VERSION,
"queryName",
"checkpointLocation",
"path",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS
import org.apache.spark.sql.delta.DeltaOperations.Operation
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand All @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Clock, Utils}

/** Record metrics about a successful commit. */
Expand Down Expand Up @@ -539,6 +541,29 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}

/**
* Checks if the new schema contains any CDC columns (which is invalid) and throws the appropriate
* error
*/
protected def performCdcMetadataCheck(): Unit = {
if (newMetadata.nonEmpty) {
if (CDCReader.isCDCEnabledOnTable(newMetadata.get)) {
val schema = newMetadata.get.schema.fieldNames
val reservedColumnsUsed = CDCReader.cdcReadSchema(new StructType()).fieldNames
.intersect(schema)
if (reservedColumnsUsed.length > 0) {
if (!CDCReader.isCDCEnabledOnTable(snapshot.metadata)) {
// cdc was not enabled previously but reserved columns are present in the new schema.
throw DeltaErrors.tableAlreadyContainsCDCColumns(reservedColumnsUsed)
} else {
// cdc was enabled but reserved columns are present in the new metadata.
throw DeltaErrors.cdcColumnsInData(reservedColumnsUsed)
}
}
}
}
}

/**
* Modifies the state of the log by adding a new commit that is based on a read at
* the given `lastVersion`. In the case of a conflict with a concurrent writer this
Expand All @@ -554,6 +579,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commitStartNano = System.nanoTime()

val (version, actualCommittedActions) = try {
// Check for CDC metadata columns
performCdcMetadataCheck()

// Try to commit at the next version.
val preparedActions = prepareCommit(actions, op)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable

import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaOptions, DeltaTableIdentifier, DeltaTableUtils, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -186,8 +187,30 @@ case class DeltaTableV2(
if (timeTravelOpt.nonEmpty && ttSpec.nonEmpty) {
throw DeltaErrors.multipleTimeTravelSyntaxUsed
}

def checkCDCOptionsValidity(options: CaseInsensitiveStringMap): Unit = {
// check if we have both version and timestamp parameters
if (options.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY)
&& options.containsKey(DeltaDataSource.CDC_START_VERSION_KEY)) {
throw DeltaErrors.multipleCDCBoundaryException("starting")
}
if (options.containsKey(DeltaDataSource.CDC_END_VERSION_KEY)
&& options.containsKey(DeltaDataSource.CDC_END_TIMESTAMP_KEY)) {
throw DeltaErrors.multipleCDCBoundaryException("ending")
}
if (!options.containsKey(DeltaDataSource.CDC_START_VERSION_KEY)
&& !options.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY)) {
throw DeltaErrors.noStartVersionForCDC()
}
}

val caseInsensitiveStringMap = new CaseInsensitiveStringMap(options.asJava)

if (timeTravelOpt.isEmpty && ttSpec.nonEmpty) {
copy(timeTravelOpt = ttSpec)
} else if (CDCReader.isCDCRead(caseInsensitiveStringMap)) {
checkCDCOptionsValidity(caseInsensitiveStringMap)
copy(cdcOptions = caseInsensitiveStringMap)
} else {
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.databricks.spark.util.DatabricksLogging
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.PartitionUtils
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -94,7 +95,10 @@ class DeltaDataSource
if (schemaToUse.isEmpty) {
throw DeltaErrors.schemaNotSetException
}
{
val options = new CaseInsensitiveStringMap(parameters.asJava)
if (CDCReader.isCDCRead(options)) {
(shortName(), CDCReader.cdcReadSchema(schemaToUse))
} else {
(shortName(), schemaToUse)
}
}
Expand Down Expand Up @@ -171,7 +175,26 @@ class DeltaDataSource

val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)
var cdcOptions: mutable.Map[String, String] = mutable.Map.empty

val caseInsensitiveParams = new CaseInsensitiveStringMap(parameters.asJava)
if (CDCReader.isCDCRead(caseInsensitiveParams)) {
cdcOptions = mutable.Map[String, String](DeltaDataSource.CDC_ENABLED_KEY -> "true")
if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_START_VERSION_KEY)) {
cdcOptions(DeltaDataSource.CDC_START_VERSION_KEY) = caseInsensitiveParams.get(
DeltaDataSource.CDC_START_VERSION_KEY)
}
if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_START_TIMESTAMP_KEY)) {
cdcOptions(DeltaDataSource.CDC_START_TIMESTAMP_KEY) = caseInsensitiveParams.get(
DeltaDataSource.CDC_START_TIMESTAMP_KEY)
}
if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_END_VERSION_KEY)) {
cdcOptions(DeltaDataSource.CDC_END_VERSION_KEY) = caseInsensitiveParams.get(
DeltaDataSource.CDC_END_VERSION_KEY)
}
if (caseInsensitiveParams.containsKey(DeltaDataSource.CDC_END_TIMESTAMP_KEY)) {
cdcOptions(DeltaDataSource.CDC_END_TIMESTAMP_KEY) = caseInsensitiveParams.get(
DeltaDataSource.CDC_END_TIMESTAMP_KEY)
}
}
val dfOptions: Map[String, String] =
if (sqlContext.sparkSession.sessionState.conf.getConf(
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
Expand Down
Loading

0 comments on commit d10781c

Please sign in to comment.