Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for delta atomic commit read #1079

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ trait DeltaReadOptions extends DeltaOptionParser {
}
}

val readAtomicCommits = options.get(READ_ATOMIC_COMMITS_OPTION)
.exists(toBoolean(_, READ_ATOMIC_COMMITS_OPTION))

val ignoreFileDeletion = options.get(IGNORE_FILE_DELETION_OPTION)
.exists(toBoolean(_, IGNORE_FILE_DELETION_OPTION))

Expand Down Expand Up @@ -188,6 +191,7 @@ object DeltaOptions extends DeltaLogging {
val DATA_CHANGE_OPTION = "dataChange"
val STARTING_VERSION_OPTION = "startingVersion"
val STARTING_TIMESTAMP_OPTION = "startingTimestamp"
val READ_ATOMIC_COMMITS_OPTION = "readAtomicCommits"

val validOptionKeys : Set[String] = Set(
REPLACE_WHERE_OPTION,
Expand All @@ -204,6 +208,7 @@ object DeltaOptions extends DeltaLogging {
DATA_CHANGE_OPTION,
STARTING_TIMESTAMP_OPTION,
STARTING_VERSION_OPTION,
READ_ATOMIC_COMMITS_OPTION,
"queryName",
"checkpointLocation",
"path",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ trait DeltaSourceBase extends Source
val changes = getFileChanges(fromVersion, fromIndex, isStartingVersion)
if (limits.isEmpty) return changes

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count towards
// the maxFilesPerTrigger conf.
// Take each change until we've seen the configured number of addFiles, and read until the end
// of the commit if set readAtomicCommits. Some changes don't represent file additions; we
// retain them for offset tracking, but they don't count towards the maxFilesPerTrigger conf.
var admissionControl = limits.get
changes.withClose { it =>
it.takeWhile { index =>
admissionControl.admit(Option(index.add))
admissionControl.admit(Option(index.add), index.isLast)
}
}
}
Expand Down Expand Up @@ -522,7 +522,7 @@ case class DeltaSource(
trait DeltaSourceAdmissionBase { self: AdmissionLimits =>

/** Whether to admit the next file */
def admit(fileAction: Option[FileAction]): Boolean = {
def admit(fileAction: Option[FileAction], isLast: Boolean): Boolean = {
def getSize(action: FileAction): Long = {
action match {
case a: AddFile =>
Expand All @@ -534,11 +534,16 @@ case class DeltaSource(
}
}

if (fileAction.isEmpty) return true
val shouldAdmit = filesToTake > 0 && bytesToTake > 0
filesToTake -= 1
val shouldAdmit = filesToTake > 0 && bytesToTake > 0 || (readAtomic && withinCommit)
withinCommit = !isLast

if (fileAction.isEmpty) {
if (!readAtomic) return true
} else {
filesToTake -= 1
bytesToTake -= getSize(fileAction.get)
}

bytesToTake -= getSize(fileAction.get)
shouldAdmit
}
}
Expand All @@ -548,7 +553,8 @@ case class DeltaSource(
*/
class AdmissionLimits(
maxFiles: Option[Int] = options.maxFilesPerTrigger,
var bytesToTake: Long = options.maxBytesPerTrigger.getOrElse(Long.MaxValue)
var bytesToTake: Long = options.maxBytesPerTrigger.getOrElse(Long.MaxValue),
val readAtomic: Boolean = options.readAtomicCommits
) extends DeltaSourceAdmissionBase {

protected var filesToTake = maxFiles.getOrElse {
Expand All @@ -559,6 +565,8 @@ case class DeltaSource(
}
}

protected var withinCommit = false

def toReadLimit: ReadLimit = {
if (options.maxFilesPerTrigger.isDefined && options.maxBytesPerTrigger.isDefined) {
CompositeLimit(
Expand Down
141 changes: 139 additions & 2 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import java.util.UUID

import scala.concurrent.duration._
import scala.language.implicitConversions

import org.apache.spark.sql.delta.actions.{AddFile, InvalidProtocolVersionException, Protocol}
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.sql.{AnalysisException, Dataset, Row}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.execution.streaming._
Expand Down Expand Up @@ -531,6 +529,145 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest {
}
}

test("readAtomicCommits: read complete commits") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))

spark.range(0, 50).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
spark.range(50, 100).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)

val q1 = spark.readStream
.format("delta")
.option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "1")
.option(DeltaOptions.READ_ATOMIC_COMMITS_OPTION, "true")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q1.processAllAvailable()
val progress = q1.recentProgress.filter(_.numInputRows != 0)
// read the whole snapshot
assert(progress.length === 1)
checkAnswer(
spark.read.format("delta").load(outputDir.getAbsolutePath),
(0 until 100).map(_.toString).toDF)
} finally {
q1.stop()
}

spark.range(100, 150).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
spark.range(150, 200).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)

val q2 = spark.readStream
.format("delta")
.option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "1")
.option(DeltaOptions.READ_ATOMIC_COMMITS_OPTION, "true")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q2.processAllAvailable()
val progress = q2.recentProgress.filter(_.numInputRows != 0)
// read one commit each time
assert(progress.length === 2)
checkAnswer(
spark.read.format("delta").load(outputDir.getAbsolutePath),
(0 until 200).map(_.toString).toDF)
} finally {
q2.stop()
}

spark.range(200, 250).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
spark.range(250, 300).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)

val q3 = spark.readStream
.format("delta")
.option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "2")
.option(DeltaOptions.READ_ATOMIC_COMMITS_OPTION, "true")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q3.processAllAvailable()
val progress = q3.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 2)
checkAnswer(
spark.read.format("delta").load(outputDir.getAbsolutePath),
(0 until 300).map(_.toString).toDF)
} finally {
q3.stop()
}

spark.range(300, 350).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
spark.range(350, 400).map(_.toString).repartition(2)
.write.mode("append").format("delta").save(deltaLog.dataPath.toString)

val q4 = spark.readStream
.format("delta")
.option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "3")
.option(DeltaOptions.READ_ATOMIC_COMMITS_OPTION, "true")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q4.processAllAvailable()
val progress = q4.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
checkAnswer(
spark.read.format("delta").load(outputDir.getAbsolutePath),
(0 until 400).map(_.toString).toDF)
} finally {
q4.stop()
}
}
}

test("readAtomicCommits: process at least one commit") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))

(0 until 5).foreach { i =>
val v = Seq(i.toString).toDF
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
}

val q = spark.readStream
.format("delta")
.option(DeltaOptions.MAX_BYTES_PER_TRIGGER_OPTION, "1b")
.option(DeltaOptions.READ_ATOMIC_COMMITS_OPTION, "true")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q.processAllAvailable()
val progress = q.recentProgress.filter(_.numInputRows != 0)
assert(progress.length === 1)
checkAnswer(
spark.read.format("delta").load(outputDir.getAbsolutePath),
(0 until 5).map(_.toString).toDF)
} finally {
q.stop()
}
}
}

test("unknown sourceVersion value") {
val json =
s"""
Expand Down