Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-additive schema evolution for Delta streaming source #1690

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,6 @@
],
"sqlState" : "42KD4"
},
"DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : {
"message" : [
"<opName> is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Although strongly not recommended, you may also force ignore the schema checks during <opName> at your own risk of potentially incorrect results by turning on the SQL conf <escapeConfig>."
],
"sqlState" : "42KD4"
},
"DELTA_BLOOM_FILTER_DROP_ON_NON_EXISTING_COLUMNS" : {
"message" : [
"Cannot drop bloom filter indices for the following non-existent column(s): <unknownColumns>"
Expand Down Expand Up @@ -1558,6 +1550,30 @@
],
"sqlState" : "KD002"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.",
"See <docLink> for more details."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_SCHEMA_EVOLUTION" : {
"message" : [
"The schema of your Delta table has changed during streaming, and the schema tracking log has been updated",
"Please restart the stream to continue processing using the updated schema:",
"<schema>"
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOCATION_CONFLICT" : {
"message" : [
"Detected conflicting schema location '<loc>' while streaming from table or table located at '<table>'.",
Expand Down Expand Up @@ -1595,6 +1611,14 @@
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_SCHEMA" : {
"message" : [
"We could not initialize the Delta streaming source schema log with a valid schema because",
"we detected an incompatible schema change while serving a streaming batch from table version <a> to <b>.",
"To continue processing the stream with latest schema, please turn on <config>."
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOG_PARSE_SCHEMA_FAILED" : {
"message" : [
"Failed to parse the schema from the Delta streaming source schema log.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ class DeltaAnalysis(session: SparkSession)
assert(options.get("path").isDefined, "Path for Delta table must be defined")
val log = DeltaLog.forTable(session, options.get("path").get)
val sourceIdOpt = options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)
val schemaTrackingLocation = DeltaSourceSchemaLog.fullSchemaTrackingLocation(
val schemaTrackingLocation = DeltaSourceSchemaTrackingLog.fullSchemaTrackingLocation(
rootSchemaTrackingLocation, log.tableId, sourceIdOpt)
// Make sure schema location is under checkpoint
if (!allowSchemaLocationOutsideOfCheckpoint &&
Expand Down
46 changes: 34 additions & 12 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2556,16 +2556,18 @@ trait DeltaErrorsBase
)
}

def blockStreamingReadsOnColumnMappingEnabledTable(
def blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges(
spark: SparkSession,
readSchema: StructType,
incompatibleSchema: StructType,
isCdfRead: Boolean,
detectedDuringStreaming: Boolean): Throwable = {
new DeltaColumnMappingUnsupportedSchemaIncompatibleException(
if (isCdfRead) "Streaming read of Change Data Feed (CDF)" else "Streaming read",
val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION)
new DeltaStreamingColumnMappingSchemaIncompatibleException(
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key,
"",
enableNonAdditiveSchemaEvolution,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString
))
Expand Down Expand Up @@ -2639,6 +2641,24 @@ trait DeltaErrorsBase
cause = cause)
}

def streamingSchemaEvolutionException(newSchema: StructType): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_EVOLUTION",
messageParameters = Array(formatSchema(newSchema)))
}

def streamingSchemaLogInitFailedIncompatibleSchemaException(
startVersion: Long,
endVersion: Long): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_SCHEMA",
messageParameters = Array(
startVersion.toString, endVersion.toString,
DeltaSQLConf.
DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_START.key)
)
}

def failToDeserializeSchemaLog(location: String): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_LOG_DESERIALIZE_FAILED",
Expand Down Expand Up @@ -3080,18 +3100,20 @@ class DeltaChecksumException(
* To make compatible with existing behavior for those who accidentally has already used this
* operation, user should always be able to use `escapeConfigName` to fall back at own risk.
*/
class DeltaColumnMappingUnsupportedSchemaIncompatibleException(
val opName: String,
class DeltaStreamingColumnMappingSchemaIncompatibleException(
val readSchema: StructType,
val incompatibleSchema: StructType,
val escapeConfigName: String,
val docLink: String,
val enableNonAdditiveSchemaEvolution: Boolean = false,
val additionalProperties: Map[String, String] = Map.empty)
extends DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION",
errorClass = if (enableNonAdditiveSchemaEvolution) {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG"
} else {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"
},
messageParameters = Array(
opName,
readSchema.json,
incompatibleSchema.json,
opName,
escapeConfigName)
docLink)
)
20 changes: 9 additions & 11 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,18 +457,19 @@ class DeltaLog private(
* Returns a [[org.apache.spark.sql.DataFrame]] containing the new files within the specified
* version range.
*
* It can optionally take a customReadSchema which consists of the actual read schema to read
* the files. This is used to support non-additive Delta Source streaming schema evolution.
* The customReadSchema requires that its partitionSchema for the Delta table does not change from
* the snapshot's partitionSchema.
* @param customDataSchema Optional data schema that will be used to read the files.
* This is used when reading multiple snapshots using one all-encompassing
* schema, e.g. during streaming.
* This parameter only modifies the data schema. The partition schema is
* not updated, so the caller should ensure that it does not change
* compared to the snapshot.
*/
def createDataFrame(
snapshot: Snapshot,
addFiles: Seq[AddFile],
isStreaming: Boolean = false,
actionTypeOpt: Option[String] = None,
customReadSchema: Option[PersistedSchema] = None
): DataFrame = {
customDataSchema: Option[StructType] = None): DataFrame = {
val actionType = actionTypeOpt.getOrElse(if (isStreaming) "streaming" else "batch")
// It's ok to not pass down the partitionSchema to TahoeBatchFileIndex. Schema evolution will
// ensure any partitionSchema changes will be captured, and upon restart, the new snapshot will
Expand All @@ -479,13 +480,10 @@ class DeltaLog private(
val partitionSchema = snapshot.metadata.partitionSchema
var metadata = snapshot.metadata

require(customReadSchema.forall(_.partitionSchema == partitionSchema),
"Cannot specify a custom read schema with different partition schema than the Delta table")

// Replace schema inside snapshot metadata so that later `fileFormat()` can generate the correct
// DeltaParquetFormat with the correct schema to references, the customReadSchema should also
// DeltaParquetFormat with the correct schema to references, the customDataSchema should also
// contain the correct column mapping metadata if needed after being loaded from schema log.
customReadSchema.map(_.dataSchema).foreach { readSchema =>
customDataSchema.foreach { readSchema =>
metadata = snapshot.metadata.copy(schemaString = readSchema.json)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ object DeltaOptions extends DeltaLogging {
* An option to allow column mapping enabled tables to conduct schema evolution during streaming
*/
val SCHEMA_TRACKING_LOCATION = "schemaTrackingLocation"
/**
* Alias for `schemaTrackingLocation`, so users familiar with AutoLoader can migrate easily.
*/
val SCHEMA_TRACKING_LOCATION_ALIAS = "schemaLocation"
/**
* An option to instruct DeltaSource to pick a customized subdirectory for schema log in case of
* rare conflicts such as when a stream needs to do a self-union of two Delta sources from the
Expand Down Expand Up @@ -307,6 +311,7 @@ object DeltaOptions extends DeltaLogging {
TXN_APP_ID,
TXN_VERSION,
SCHEMA_TRACKING_LOCATION,
SCHEMA_TRACKING_LOCATION_ALIAS,
STREAMING_SOURCE_TRACKING_ID,
"queryName",
"checkpointLocation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,9 @@ object SingleAction extends Logging {

lazy val nullLitForAddCDCFile: Column =
new Column(Literal(null, ScalaReflection.schemaFor[AddCDCFile].dataType))

lazy val nullLitForMetadataAction: Column =
new Column(Literal(null, ScalaReflection.schemaFor[Metadata].dataType))
}

/** Serializes Maps containing JSON strings without extra escaping. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ class DeltaDataSource

val (_, snapshot) = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)
val readSchema = {
getSchemaLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
// Use `getSchemaAtLogInit` so it's always consistent between analysis and execution phase
.flatMap(_.getSchemaAtLogInit.map(_.dataSchema))
getSchemaTrackingLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
.flatMap(_.getCurrentTrackedSchema.map(_.dataSchema))
.getOrElse(snapshot.schema)
}

Expand Down Expand Up @@ -122,10 +121,12 @@ class DeltaDataSource
})
val options = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)
val schemaLogOpt =
getSchemaLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
val readSchema = schemaLogOpt
.flatMap(_.getSchemaAtLogInit.map(_.dataSchema))

val schemaTrackingLogOpt =
getSchemaTrackingLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)

val readSchema = schemaTrackingLogOpt
.flatMap(_.getCurrentTrackedSchema.map(_.dataSchema))
.getOrElse(snapshot.schema)

if (readSchema.isEmpty) {
Expand All @@ -136,7 +137,7 @@ class DeltaDataSource
deltaLog,
options,
snapshot,
schemaLog = schemaLogOpt
schemaTrackingLog = schemaTrackingLogOpt
)
}

Expand Down Expand Up @@ -237,14 +238,26 @@ class DeltaDataSource
/**
* Create a schema log for Delta streaming source if possible
*/
private def getSchemaLogForDeltaSource(
private def getSchemaTrackingLogForDeltaSource(
spark: SparkSession,
sourceSnapshot: Snapshot,
parameters: Map[String, String]): Option[DeltaSourceSchemaLog] = {
parameters: Map[String, String]): Option[DeltaSourceSchemaTrackingLog] = {
val options = new CaseInsensitiveStringMap(parameters.asJava)
Option(options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION))
.orElse(Option(options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS)))
.map { schemaTrackingLocation =>
DeltaSourceSchemaLog.create(
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION)) {
// TODO: remove once non-additive schema evolution is released
throw new UnsupportedOperationException(
"Schema tracking location is not supported for Delta streaming source")
}
if (Option(options.get(DeltaOptions.CDC_READ_OPTION)).exists(_.toBoolean)) {
// TODO: remove once we support CDC streaming with schema log
throw new UnsupportedOperationException(
"Reading change data feed and streaming is not supported with schema tracking log")
}
DeltaSourceSchemaTrackingLog.create(
spark, schemaTrackingLocation, sourceSnapshot,
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_SATRT =
val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_START =
buildConf("streaming.unsafeReadOnIncompatibleSchemaChangesDuringStreamStart.enabled")
.doc(
"""A legacy config to disable schema read-compatibility check on the start version schema
Expand All @@ -912,6 +912,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION =
buildConf("streaming.nonAdditiveSchemaEvolution.enabled")
.doc(
"""If enabled, Delta streaming source can support non-additive schema evolution for
|operations such as rename or drop column on column mapping enabled tables.
|""".stripMargin)
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_ALLOW_SCHEMA_LOCATION_OUTSIDE_CHECKPOINT_LOCATION =
buildConf("streaming.allowSchemaLocationOutsideCheckpointLocation")
.doc(
Expand Down
Loading