-
Notifications
You must be signed in to change notification settings - Fork 226
Scan Delete Support Part 2: introduce DeleteFileManager
skeleton. Use in ArrowReader
#950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scan Delete Support Part 2: introduce DeleteFileManager
skeleton. Use in ArrowReader
#950
Conversation
DeleteFileManager
skeleton. Use in ArrowReader
9d47546
to
4c2ef08
Compare
@liurenjie1024, @Xuanwo, @Fokko - this is ready for review when any of you get chance. Thanks! :-) |
DeleteFileManager
skeleton. Use in ArrowReader
DeleteFileManager
skeleton. Use in ArrowReader
Hi @Xuanwo, @liurenjie1024, @Fokko, @ZENOTME - I've now got 5 separate PRs open for delete support on the read side that each extend the previous one. They're ready for review, with only a couple of tests outstanding that I expect to complete over the next day or two. Would you mind reviewing please? It will be a bit of a burden to keep all of these separate PRs up-to-date with main whilst they are all still open. Thanks, and I look forward to your feedback! 😁 |
2246dc3
to
114317d
Compare
crates/iceberg/src/arrow/reader.rs
Outdated
@@ -162,30 +165,24 @@ impl ArrowReader { | |||
file_io: FileIO, | |||
row_group_filtering_enabled: bool, | |||
row_selection_enabled: bool, | |||
concurrency_limit_data_files: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at some point we want to read this from some kind of configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think this also potentially makes more sense as a semaphore of some kind rather than just a usize so that it can be shared better between tasks that effectively you'd want to share the same combined limit.
114317d
to
4311f89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sdd for this greate pr! And sorry for late reply, there are too many prs pending for review. I'll concentrate on the delete support prs in following days so that we could deliver it in next release.
use crate::spec::SchemaRef; | ||
use crate::{Error, ErrorKind, Result}; | ||
|
||
pub(crate) struct DeleteFileManager {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to make this a trait rather a struct, so that compute engines could has different cache policy, this is also inspired by our previous discussion in reader module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course we could provide a simple version without any cache as default choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've introduced a trait but kept all of the methods that I think would be only used in the in-tree engine, rather than any external ones, in the (renamed) struct. happy to adjust the content of this trait over the follow-up PRs if that works for you.
} | ||
} | ||
|
||
pub(crate) fn build_delete_predicate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return arrow record batch rather predicate here. Or we could have a method built on the one which returns arrow record batch. The reason is that different engines may evaluate it in different approaches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've indicated within the newly-added trait definition that the basic file-to-record-batch-stream functionality is coming in a follow-up PR that I'll refactor so that it is used either in a struct that implements the trait or in a default implementation for the trait.
Ok(None) | ||
} | ||
|
||
pub(crate) fn get_positional_delete_indexes_for_data_file( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above comment, we should provide a base version with actual data (like arrow record batch) to give advanced users enough flexibility to do that. Also I don't think we should return internal data structure directly, maybe sth like following is better?
struct PositionDeleteIndex {
bitmap: RoaringTreemap
}
Motivated by java version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding wrapping the RoaringBitmap
in a PositionalDeleteIndex
. I can do that, although the Java version's interface precludes us from using advance_to which would be very useful to have access to within the ArrowReader
row selection / page skipping code that I have in the follow-up PR to this one, #951.
Would you be ok with your proposed struct PositionDeleteIndex
exposing an .iter()
method to return a public PositionDeleteIndexIter
iterator, which itself implements advance_to
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you be ok with your proposed struct PositionDeleteIndex exposing an .iter() method to return a public PositionDeleteIndexIter iterator, which itself implements advance_to?
I'm fine with that. My concern is not exposing too much internals data structures to, and I think the advance_to
method is easy to understand as iceberg spec requires that position delete file should be sorted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The third commit in this PR is focussed on changes raised by your original comment here.
#[allow(unused_variables)] | ||
impl DeleteFileManager { | ||
pub(crate) async fn load_deletes( | ||
delete_file_entries: Vec<FileScanTaskDeleteFile>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should pass this in constructor, rather it should be in an argument of load_equality_delete
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see second commit in this PR.
Thanks for the review @liurenjie1024 - will refactor with your comments in mind. |
…emanager to be constructed prior to use
c8a000b
to
80a0801
Compare
80a0801
to
74e3aa9
Compare
Back to you @liurenjie1024 - only small changes vs when you last looked so it should be pretty quick to re-review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sdd for this pr, LGTM!
…` implementation (#951) Third part of delete file read support. See #630 **Builds on top of #950 `build_deletes_row_selection` computes a `RowSelection` from a `RoaringTreemap` representing the indexes of rows in a data file that have been marked as deleted by positional delete files that apply to the data file being read (and, in the future, delete vectors). The resulting `RowSelection` will be merged with a `RowSelection` resulting from the scan's filter predicate (if present) and supplied to the `ParquetRecordBatchStreamBuilder` so that deleted rows are omitted from the `RecordBatchStream` returned by the reader. NB: I encountered quite a few edge cases in this method and the logic is quite complex. There is a good chance that a keen-eyed reviewer would be able to conceive of an edge-case that I haven't covered. --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
Second part of delete file read support. See #630.
This PR provides the basis for delete file support within
ArrowReader
.DeleteFileManager
is introduced, in skeleton form. Full implementation of its behaviour will be submitted in follow-up PRs.DeleteFileManager
is responsible for loading and parsing positional and equality delete files fromFileIO
. Once delete files for a task have been loaded and parsed,ArrowReader::process_file_scan_task
uses the resultingDeleteFileManager
in two places:DeleteFileManager::get_delete_vector_for_task
is passed a data file path and will return anOption<Vec<usize>>
Option<RoaringTreeMap>
containing the indices of all rows that are positionally deleted in that data file (orNone
if there are none)DeleteFileManager::build_delete_predicate
is invoked with the schema from the file scan task. It will return anOption<BoundPredicate>
representing the filter predicate derived from all of the applicable equality deletes being transformed into predicates, logically joined into a single predicate and then bound to the schema (orNone
if there are no applicable equality deletes)This PR integrates the skeleton of the
DeleteFileManager
intoArrowReader::process_file_scan_task
, extending theRowFilter
andRowSelection
logic to take into account anyRowFilter
that results from equality deletes and anyRowSelection
that results from positional deletes.Updates:
DeleteFileManager
so thatget_positional_delete_indexes_for_data_file
returns aRoaringTreemap
rather than aVec<usize>
. This was based on @liurenjie1024's recommendation in a comment on the v1 PR, and makes a lot of sense from a performance perspective and made it easier to implementArrowReader::build_deletes_row_selection
in the follow-up PR to this one, Scan Delete Support Part 3:ArrowReader::build_deletes_row_selection
implementation #951DeleteFileManager
is instantiated in theArrowReader
constructor rather than per-scan-task, so that delete files that apply to more than one task don't end up getting loaded and parsed twicePotential further enhancements:
ObjectCache
to ensure that loading and parsing of the same files persists across scans