-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[1105] Change Data Feed - PR 2 - DELETE command #1125
Conversation
- correctly calculate numCopiedRows metric in DeleteCommand - add test in SchemaUtilsSuite - add metrics/stats to OptTxn and DeleteCmd - added committer.changeFiles to OptTxn - Add DeleteCDCSuite; add CDC codes in SchemaUtils - add cdc deletes to DeleteCommand.scala - perform CDC partitioning in TransactionalWrite GitOrigin-RevId: 7465d9aee506b29cfce5ed6cd9b8f86288b34f11
core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
Outdated
Show resolved
Hide resolved
numFilesToRewrite: Long): Seq[FileAction] = { | ||
val writeCdc = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(txn.metadata) | ||
|
||
val numTouchedRows = metrics("numTouchedRows") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "touched" mean in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
number of total rows that we have seen / are either copying or deleting (sum of both).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put inline comment. This is not obvious.
core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala
Outdated
Show resolved
Hide resolved
respond to PR comments
core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala
Outdated
Show resolved
Hide resolved
GitOrigin-RevId: 316cb8b05e9f87c3c4c377b6608f0eb237cd60ca
See the project plan at(delta-io#1105). This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns. Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log. Closes delta-io#1125. GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
See the project plan at(delta-io#1105). This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns. Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log. Closes delta-io#1125. GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
See the project plan at #1105.
This PR adds CDF write functionality to the DELETE command, as well as a test suite DeleteCDCSuite. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns.
Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra AddCDCFile action to the transaction log.