Skip to content

Commit

Permalink
CDF evolvability tests
Browse files Browse the repository at this point in the history
This PR adds two evolvability tests for CDF. Specifically, we test that CDF will continue to work even if there is some future column inside the delta log and checkpoint.

Closes delta-io#1172

GitOrigin-RevId: df4705cb1bdbfa6802dfa96d9f0ba0902e0d53a6
  • Loading branch information
scottsand-db authored and jbguerraz committed Jul 6, 2022
1 parent 7ebf10f commit 8ec17ec
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ class EvolvabilitySuite extends EvolvabilitySuiteBase with DeltaSQLCommandTest {
// The modified Delta files and checkpoints with an extra column is generated by
// `EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn()`

test("transaction log schema evolvability - batch change data read") {
withTempDir { dir =>
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {
EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn(spark, dir.getAbsolutePath)
spark.sql(s"UPDATE delta.`${dir.getAbsolutePath}` SET value = 10")
spark.read.format("delta").option("readChangeFeed", "true")
.option("startingVersion", 0).load(dir.getAbsolutePath).collect()

val expectedPreimage = (1 until 10).flatMap(x => Seq(x, x)).toSeq
val expectedPostimage = Seq.fill(18)(10)
testCdfUpdate(dir.getAbsolutePath, 6, expectedPreimage, expectedPostimage)
}
}
}

test("transaction log schema evolvability - streaming change data read") {
withTempDir { dir =>
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {
EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn(spark, dir.getAbsolutePath)
spark.sql(s"UPDATE delta.`${dir.getAbsolutePath}` SET value = 10")
val query = spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.load(dir.getAbsolutePath)
.writeStream.format("noop").start()
try {
query.processAllAvailable()
} finally {
query.stop()
}

val expectedPreimage = (1 until 10).flatMap(x => Seq(x, x)).toSeq
val expectedPostimage = Seq.fill(18)(10)
testCdfUpdate(dir.getAbsolutePath, 6, expectedPreimage, expectedPostimage, true)
}
}
}

test("transaction log schema evolvability - batch read") {
testLogSchemaEvolvability(
(path: String) => { spark.read.format("delta").load(path).collect() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
Expand Down Expand Up @@ -75,6 +75,46 @@ abstract class EvolvabilitySuiteBase extends QueryTest with SharedSparkSession
operation(tempDir.getAbsolutePath)
}
}

/**
* Read from a table's CDF and check for the expected preimage/postimage after applying an update
*/
protected def testCdfUpdate(
tablePath: String,
commitVersion: Long,
expectedPreimage: Seq[Int],
expectedPostimage: Seq[Int],
streaming: Boolean = false): Unit = {

val df = if (streaming) {
val q = spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", commitVersion)
.option("endingVersion", commitVersion)
.load(tablePath)
.writeStream
.option("checkpointLocation", tablePath + "-checkpoint")
.toTable("streaming");
try {
q.processAllAvailable()
} finally {
q.stop()
}
spark.read.table("streaming")
} else {
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", commitVersion)
.option("endingVersion", commitVersion)
.load(tablePath)
}

val preimage = df.where("_change_type = 'update_preimage'").select("value")
val postimage = df.where("_change_type = 'update_postimage'").select("value")

checkAnswer(preimage, expectedPreimage.map(Row(_)))
checkAnswer(postimage, expectedPostimage.map(Row(_)))
}
}


Expand Down

0 comments on commit 8ec17ec

Please sign in to comment.