-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Skip processing snapshots of type Overwrite during readStream #3517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skip processing snapshots of type Overwrite during readStream #3517
Conversation
|
https://github.com/apache/iceberg/projects/2 |
|
I stumbled on the exact same change made by @kbendick : #3267 |
|
Hi @SreeramGarlapati @kbendick, |
| "Cannot process delete snapshot : %s. Set read option %s to allow skipping snapshots of type delete", | ||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); | ||
| return false; | ||
| case DataOperations.OVERWRITE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this should use the same configuration to skip deletes and overwrites. Overwrites are different and I think that we should at a minimum have a different property. I would also prefer to have some additional clarity on how we plan to eventually handle this. We could skip overwrites, but what about use cases where they are probably upserts? What about when they're created by copy-on-write MERGE operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I would prefer they be two separate configs, but also that we have a plan for the longer term to handle sending out these row deltas.
I'd be ok with getting a PR in to ignore OVERWRITE, but this isn't something we should ignore in the longer term (or even really the near-to-medium term) as others have mentioned.
Personally I would consider using a schema similar to the delta.io change capture feed that has a dataframe with the before image / after image (row before and after update) and then the type of operation for each row (insert, delete, update_before, update_after).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I connected with @SreeramGarlapati to contribute on this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a separate config to skip overwrites. I will discuss with @SreeramGarlapati and will update on the plan to eventually handle upserts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all in all, there are 2 options for reading upserts:
- for updates which are written with -
copy on write-- a new data file is created which has a combination of both old rows and these new updated rows. So, in this case - we can take a spark option from the user to take consent - that they are okay with data replay. - for updates which are written with -
merge on read- we will expose an option to read change data feed - where we will include a metadata column - which indicates whether a record is an INSERT vs DELETE.
did this make sense - @rdblue & @kbendick
| "Cannot process overwrite snapshot : %s. Set read option %s to allow skipping snapshots of type overwrite", | ||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); | ||
| return false; | ||
| default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add a test for this new conf option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the Unit Test.
|
Hi @rdblue @kbendick @RussellSpitzer Request you to please review. |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
...k/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
...k/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
|
Merged. Thanks @SreeramGarlapati and @rajarshisarkar! |
Co-authored-by: Rajarshi Sarkar <srajars@amazon.com>
Delete operations on iceberg table can translate to Snapshots of type
DataOperations.DELETEorDataOperations.OVERWRITE.DataOperations.DELETE- when the delete operation on the table translates to full file deletes (for ex: deletes wiping out partitions using partitionKey level predicates)DataOperations.OVERWRITE- is the most common case of Delete.@rdblue / @aokolnychyi / @RussellSpitzer / @kbendick - pl. lemme know what you folks think & once we achieve principle level alignment - I will add unittests.