Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Change Data Feed in Delta Lake #1105

Closed
scottsand-db opened this issue Apr 29, 2022 · 1 comment
Closed

Support for Change Data Feed in Delta Lake #1105

scottsand-db opened this issue Apr 29, 2022 · 1 comment
Assignees
Labels
enhancement New feature or request

Comments

@scottsand-db
Copy link
Collaborator

scottsand-db commented Apr 29, 2022

Overview

This is the official issue to track interest, feature requests, and progress being made on Change Data Feed in Delta Lake. This feature is part of the Operation Enhancements section of the Delta Lake 2022 H1 Roadmap with a target of 2022 Q3.

Requirements

The aim of this project is to allow Delta tables to produce change data capture (CDC), a standard pattern for reading changing data from a table. CDC data consists of an underlying row plus metadata indicating whether the row was added, deleted, updated to, or updated from. That is, an update to a row will produce two CDC events: one containing the row preimage from before the update and one containing the row postimage after the update is applied.

When a CDC read is asked for, we’ll need to look at the Delta log entry for every DML operation and output the data as CDC events rather than raw data.

Design Sketch

Please see the official design doc here.

API

We’re adding new syntax to existing Spark interfaces for reading Delta table, so there are no additional considerations involved here - CDC just reflects a different view of data which was already available through the same interfaces.

Enabling CDC for a Delta table

To enable CDC for a table a table property on that table can be set. The version written following that will start recording change data.

ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

This property can be set at the point of table creation as well.

CREATE TABLE student (id INT, name STRING, age INT) USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)

Additionally, this property can be set for all new tables by default

SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

APIs for accessing Change Data

This section talks about the APIs that will allow a user to access changed data

DataFrame API (Scala/Python)

User provides startingVersion, endingVersion as options and also specifies readChangeFeed as an option.

spark.read.format(“delta”)
    .option(“readChangeFeed”, “true”)
    .option(“startingVersion”, startingVersion)
    .option(“endingVersion”, endingVersion)
    .table(“source”)

Note:
For timestamp variants we would provide startingTimestamp, endingTimestamp instead.
The starting and ending versions and timestamps are inclusive fields to be in line with the other time travel APIs.

The same API can be used with the DataStream reader as well

spark.readStream
    .format(“delta”)
    .option(“readChangeFeed”, “true”)
    .option(“startingVersion”, startingVersion)
    .table(“source”)

For Streaming use cases, endingVersion is not required.
If the startingVersion is not provided the table should load from the earliest available version. A latest starting version should also be supported.

SQL API

Currently we do not plan to support a SQL API due to a limitation in Apache Spark’s TableValueFunctions class. Eventually, we’d like the API to be the following.

-- providing only the startingVersion/timestamp
SELECTFROM table_changes('tableName', startingVersion)

-- version as ints or longs
SELECTFROM table_changes('tableName', startingVersion, endingVersion)

-- timestamp as string formatted timestamps
SELECTFROM table_changes('tableName', 'startingTimestamp', 'endingTimestamp')

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECTFROM table_changes('dbName.`dotted.tableName`', 'startingTimestamp', 'endingTimestamp')

-- path based tables	
SELECTFROM table_changes_by_path('/path', 'startingTimestamp', 'endingTimestamp')

Project Plan

Task Description Status PR
Basic writing Update TransactionalWrite.scala and DelayedCommitProtocol.scala to support the CDC virtual data partitioning during writes, as described above DONE #1116
Batch Reading Implement CDCReader.scala and CDCReaderSuite.scala. We can mock write operations by explicitly partitioning (e.g. _change_type column) some data. Should be able to read basic writes, like INSERT DONE #1116
DataFrame API Expose the above-mentioned DataFrame APIs as valid options DONE #1132
Protocol support Although Delta Lake does support generated columns (i.e. part of delta protocol writer version 4) it currently fails operations if CDC is discovered in a table. Revert this. DONE #1116
DELETE Update DeleteCommand.scala and implement DeleteCDCSuite.scala. This will be a simple first command to implement and let us test out our reading/writing. DONE #1125
Milestone 0.1 (May 5 2022, or ~ 1 PW): Basic prototype with DELETE support and DataFrame API
MERGE INTO Implement MERGE INTO support for CDC data. MergeIntoCommand.scala DONE #1155
UPDATE Implement UPDATE support for CDC data. UpdateCommand.scala DONE #1146
VACUUM Implement VACUUM support for CDC data. DONE #1177
Partition filtering Add partition filtering to CDC indexes DONE #1178
Stream reading Enable readChangeFeed streaming API DONE #1154
Metrics Add metrics to provide visibility into usage and metadata about CDC operations. (Data volumes, types of operations, partitioning, etc.) NOT STARTED
RESTORE Figure out RESTORE support DONE #1212
Generated Columns Verify + solve these cases DONE #1173
Evolvability tests Add evolvability tests for CDF. EvolvabilitySuite.scala DONE #1172
Checkpoint tests Ensure that checkpoint does not contain CDC field. CheckpointsSuite.scala DONE #1180
@scottsand-db scottsand-db added the enhancement New feature or request label Apr 29, 2022
@scottsand-db scottsand-db self-assigned this Apr 29, 2022
This was referenced Apr 29, 2022
@dennyglee dennyglee pinned this issue Apr 29, 2022
scottsand-db added a commit that referenced this issue May 12, 2022
See the project plan at(#1105).

This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns.

Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log.

Closes #1125.

GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
scottsand-db added a commit that referenced this issue May 18, 2022
See the project plan at #1105.

This PR adds the DataFrame API for CDF as well as a new test suite to test this API. This API includes options

"startingVersion"
"startingTimestamp"
"endingVersion"
"endingTimestamp"
"readChangeFeed"
Misc. other CDF improvements, too, like extra schema checks during OptTxn write and returning a CDF relation in the DeltaLog::createRelation method.

Closes #1132

GitOrigin-RevId: 7ffafc6772fc314064971d65d9e7946b7a01de64

GitOrigin-RevId: b901d21804fe7aaecd6bb2e03cb33c76e19ae2ad
prakharjain09 pushed a commit to prakharjain09/delta that referenced this issue May 19, 2022
See the project plan at delta-io#1105.

This PR adds the DataFrame API for CDF as well as a new test suite to test this API. This API includes options

"startingVersion"
"startingTimestamp"
"endingVersion"
"endingTimestamp"
"readChangeFeed"
Misc. other CDF improvements, too, like extra schema checks during OptTxn write and returning a CDF relation in the DeltaLog::createRelation method.

Closes delta-io#1132

GitOrigin-RevId: 5b8179d1baa154d46b015dd7dfba0f52e7032df5
scottsand-db added a commit that referenced this issue May 25, 2022
See the project plan at #1105.

This PR adds CDF to the UPDATE command, during which we generate both preimage and postimage CDF data.

This PR also adds UpdateCDCSuite which adds basic tests for these CDF changes.

As a high-level overview of how this CDF-update operation is performed, when we find a row that satisfies the update condition, we `explode` an array containing the pre-image, post-image, and main-table updated rows.

The pre-image and post-image rows are appropriately typed with the corresponding CDF_TYPE, and the main-table updated row has CDF_TYPE `null`. Thus, the first two rows will be written to the cdf parquet file, with the latter is written to standard main-table data parquet file.

Closes #1146

GitOrigin-RevId: 47413c5345bb97c0e1303a7f4d4d06b89c35ab7a
allisonport-db added a commit that referenced this issue Jun 4, 2022
See the project plan at #1105.

This PR adds CDF to the `MERGE` command.

Merge is implemented in two ways.

- Insert-only merges. For these we don't need to do anything special, since we only write `AddFile`s with the new rows.
    - However, our current implementation of insert-only merges doesn't correctly update the metric `numTargetRowsInserted`, which is used to check for data changes in [CDCReader](https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L313). This PR fixes that.

- For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.
    - Changes made to `JoinedRowProcessor` make column `ROW_DELETED_COL` unnecessary, so this PR removes it.

Tests are added in `MergeCDCSuite`.

Closes #1155

GitOrigin-RevId: 0386c6ff811abe433644b5f5f46a3c7d51001740
@francescomucio
Copy link

Hi @scottsand-db,

I noticed that the batch reads are already supported in the Databricks platform. Is the limitation of the TableValueFunctions class mentioned above solved or is that an exclusive Databricks feature?

jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at(delta-io#1105).

This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns.

Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log.

Closes delta-io#1125.

GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds the DataFrame API for CDF as well as a new test suite to test this API. This API includes options

"startingVersion"
"startingTimestamp"
"endingVersion"
"endingTimestamp"
"readChangeFeed"
Misc. other CDF improvements, too, like extra schema checks during OptTxn write and returning a CDF relation in the DeltaLog::createRelation method.

Closes delta-io#1132

GitOrigin-RevId: 7ffafc6772fc314064971d65d9e7946b7a01de64

GitOrigin-RevId: b901d21804fe7aaecd6bb2e03cb33c76e19ae2ad
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the UPDATE command, during which we generate both preimage and postimage CDF data.

This PR also adds UpdateCDCSuite which adds basic tests for these CDF changes.

As a high-level overview of how this CDF-update operation is performed, when we find a row that satisfies the update condition, we `explode` an array containing the pre-image, post-image, and main-table updated rows.

The pre-image and post-image rows are appropriately typed with the corresponding CDF_TYPE, and the main-table updated row has CDF_TYPE `null`. Thus, the first two rows will be written to the cdf parquet file, with the latter is written to standard main-table data parquet file.

Closes delta-io#1146

GitOrigin-RevId: 47413c5345bb97c0e1303a7f4d4d06b89c35ab7a
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the `MERGE` command.

Merge is implemented in two ways.

- Insert-only merges. For these we don't need to do anything special, since we only write `AddFile`s with the new rows.
    - However, our current implementation of insert-only merges doesn't correctly update the metric `numTargetRowsInserted`, which is used to check for data changes in [CDCReader](https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L313). This PR fixes that.

- For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.
    - Changes made to `JoinedRowProcessor` make column `ROW_DELETED_COL` unnecessary, so this PR removes it.

Tests are added in `MergeCDCSuite`.

Closes delta-io#1155

GitOrigin-RevId: 0386c6ff811abe433644b5f5f46a3c7d51001740
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at(delta-io#1105).

This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns.

Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log.

Closes delta-io#1125.

GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds the DataFrame API for CDF as well as a new test suite to test this API. This API includes options

"startingVersion"
"startingTimestamp"
"endingVersion"
"endingTimestamp"
"readChangeFeed"
Misc. other CDF improvements, too, like extra schema checks during OptTxn write and returning a CDF relation in the DeltaLog::createRelation method.

Closes delta-io#1132

GitOrigin-RevId: 7ffafc6772fc314064971d65d9e7946b7a01de64

GitOrigin-RevId: b901d21804fe7aaecd6bb2e03cb33c76e19ae2ad
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the UPDATE command, during which we generate both preimage and postimage CDF data.

This PR also adds UpdateCDCSuite which adds basic tests for these CDF changes.

As a high-level overview of how this CDF-update operation is performed, when we find a row that satisfies the update condition, we `explode` an array containing the pre-image, post-image, and main-table updated rows.

The pre-image and post-image rows are appropriately typed with the corresponding CDF_TYPE, and the main-table updated row has CDF_TYPE `null`. Thus, the first two rows will be written to the cdf parquet file, with the latter is written to standard main-table data parquet file.

Closes delta-io#1146

GitOrigin-RevId: 47413c5345bb97c0e1303a7f4d4d06b89c35ab7a
jbguerraz pushed a commit to jbguerraz/delta that referenced this issue Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the `MERGE` command.

Merge is implemented in two ways.

- Insert-only merges. For these we don't need to do anything special, since we only write `AddFile`s with the new rows.
    - However, our current implementation of insert-only merges doesn't correctly update the metric `numTargetRowsInserted`, which is used to check for data changes in [CDCReader](https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L313). This PR fixes that.

- For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.
    - Changes made to `JoinedRowProcessor` make column `ROW_DELETED_COL` unnecessary, so this PR removes it.

Tests are added in `MergeCDCSuite`.

Closes delta-io#1155

GitOrigin-RevId: 0386c6ff811abe433644b5f5f46a3c7d51001740
@tdas tdas unpinned this issue Aug 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants