Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1105] Delta Lake Change Data Feed support - streaming reads #1154

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ trait DeltaReadOptions extends DeltaOptionParser {
val failOnDataLoss = options.get(FAIL_ON_DATA_LOSS_OPTION)
.forall(toBoolean(_, FAIL_ON_DATA_LOSS_OPTION)) // thanks to forall: by default true

val readChangeFeed = options.get(CDC_READ_OPTION).exists(toBoolean(_, CDC_READ_OPTION)) ||
options.get(CDC_READ_OPTION_LEGACY).exists(toBoolean(_, CDC_READ_OPTION_LEGACY))

val excludeRegex: Option[Regex] = try options.get(EXCLUDE_REGEX_OPTION).map(_.r) catch {
case e: PatternSyntaxException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.matching.Regex

import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaErrors, DeltaLog, DeltaOptions, DeltaTimeTravelSpec, GeneratedColumn, StartingVersion, StartingVersionLatest}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files.DeltaSourceSnapshot
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
Expand Down Expand Up @@ -93,8 +94,15 @@ trait DeltaSourceBase extends Source
with SupportsAdmissionControl
with DeltaLogging { self: DeltaSource =>

override val schema: StructType =
ColumnWithDefaultExprUtils.removeDefaultExpressions(deltaLog.snapshot.metadata.schema)
override val schema: StructType = {
val schemaWithoutCDC =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment on why we removing some fields from the schema

ColumnWithDefaultExprUtils.removeDefaultExpressions(deltaLog.snapshot.metadata.schema)
if (options.readChangeFeed) {
CDCReader.cdcReadSchema(schemaWithoutCDC)
} else {
schemaWithoutCDC
}
}

protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = _

Expand All @@ -104,20 +112,30 @@ trait DeltaSourceBase extends Source
isStartingVersion: Boolean,
limits: Option[AdmissionLimits] = Some(new AdmissionLimits())):
ClosableIterator[IndexedFile] = {
val changes = getFileChanges(fromVersion, fromIndex, isStartingVersion)
if (limits.isEmpty) return changes

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count towards
// the maxFilesPerTrigger conf.
var admissionControl = limits.get
changes.withClose { it =>
it.takeWhile { index =>
admissionControl.admit(Option(index.add))
if (options.readChangeFeed) {
// in this CDC use case, we need to consider RemoveFile and AddCDCFiles when getting the
// offset.

// This method is only used to get the offset so we need to return an iterator of IndexedFile.
getFileChangesForCDC(fromVersion, fromIndex, isStartingVersion, limits, None).flatMap(_._2)
.toClosable
} else {
val changes = getFileChanges(fromVersion, fromIndex, isStartingVersion)
if (limits.isEmpty) return changes

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count towards
// the maxFilesPerTrigger conf.
var admissionControl = limits.get
changes.withClose { it =>
it.takeWhile { index =>
admissionControl.admit(Option(index.add))
}
}
}
}


Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete

/**
* get the changes from startVersion, startIndex to the end
* @param startVersion - calculated starting version
Expand All @@ -131,21 +149,25 @@ trait DeltaSourceBase extends Source
startIndex: Long,
isStartingVersion: Boolean,
endOffset: DeltaSourceOffset): DataFrame = {
val changes = getFileChanges(startVersion, startIndex, isStartingVersion)
try {
val fileActionsIter = changes.takeWhile { case IndexedFile(version, index, _, _, _, _) =>
version < endOffset.reservoirVersion ||
(version == endOffset.reservoirVersion && index <= endOffset.index)
}
if (options.readChangeFeed) {
getCDCFileChangesAndCreateDataFrame(startVersion, startIndex, isStartingVersion, endOffset)
} else {
val changes = getFileChanges(startVersion, startIndex, isStartingVersion)
try {
val fileActionsIter = changes.takeWhile { case IndexedFile(version, index, _, _, _, _) =>
version < endOffset.reservoirVersion ||
(version == endOffset.reservoirVersion && index <= endOffset.index)
}

val filteredIndexedFiles = fileActionsIter.filter { indexedFile =>
indexedFile.getFileAction != null &&
excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty)
}
val filteredIndexedFiles = fileActionsIter.filter { indexedFile =>
indexedFile.getFileAction != null &&
excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty)
}

createDataFrame(filteredIndexedFiles)
} finally {
changes.close()
createDataFrame(filteredIndexedFiles)
} finally {
changes.close()
}
}
}

Expand Down Expand Up @@ -183,7 +205,8 @@ case class DeltaSource(
deltaLog: DeltaLog,
options: DeltaOptions,
filters: Seq[Expression] = Nil)
extends DeltaSourceBase {
extends DeltaSourceBase
with DeltaSourceCDCSupport {

// Deprecated. Please use `ignoreDeletes` or `ignoreChanges` from now on.
private val ignoreFileDeletion = {
Expand Down Expand Up @@ -520,9 +543,36 @@ case class DeltaSource(
override def toString(): String = s"DeltaSource[${deltaLog.dataPath}]"

trait DeltaSourceAdmissionBase { self: AdmissionLimits =>
// This variable indicates whether a commit has already been processed by a batch or not.
var commitProcessedInBatch = false

/**
* This overloaded method checks if all the AddCDCFiles for a commit can be accommodated by
* the rate limit.
*/
def admit(fileActions: Seq[AddCDCFile]): Boolean = {
def getSize(actions: Seq[AddCDCFile]): Long = {
actions.foldLeft(0L) { (l, r) => l + r.size }
}
if (fileActions.isEmpty) {
true
} else {
// if no files have been admitted, then admit all to avoid deadlock
// else check if all of the files together satisfy the limit, only then admit
val shouldAdmit = !commitProcessedInBatch ||
(filesToTake - fileActions.size >= 0 && bytesToTake - getSize(fileActions) >= 0)

commitProcessedInBatch = true
filesToTake -= fileActions.size
bytesToTake -= getSize(fileActions)
shouldAdmit
}
}

/** Whether to admit the next file */
def admit(fileAction: Option[FileAction]): Boolean = {
commitProcessedInBatch = true

def getSize(action: FileAction): Long = {
action match {
case a: AddFile =>
Expand Down
Loading