Skip to content

Delete Files in Table Scans #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
sdd opened this issue Sep 13, 2024 · 11 comments
Open

Delete Files in Table Scans #630

sdd opened this issue Sep 13, 2024 · 11 comments

Comments

@sdd
Copy link
Contributor

sdd commented Sep 13, 2024

I'm looking to start work on proper handling of delete files in table scans and so I'd like to open an issue to discuss some of the design decisions.

A core tenet of our approach so far has been to ensure that the tasks produced by the file plan are small, independent and self-contained, so that they can be easily distributed in architectures where the service that generates the file plan could be on a different machine to the service(s) that perform the file reads.

TheFileScanTask struct represents these individual units of work at present. Currently though, it's shape is focussed on Data files and it does not cater for including information on Delete files that are produced by the scan. Here's how it looks now, for reference:

/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
/// The start offset of the file to scan.
pub start: u64,
/// The length of the file to scan.
pub length: u64,
/// The number of records in the file to scan.
///
/// This is an optional field, and only available if we are
/// reading the entire data file.
pub record_count: Option<u64>,
/// The data file path corresponding to the task.
pub data_file_path: String,
/// The content type of the file to scan.
pub data_file_content: DataContentType,
/// The format of the file to scan.
pub data_file_format: DataFileFormat,
/// The schema of the file to scan.
pub schema: SchemaRef,
/// The field ids to project.
pub project_field_ids: Vec<i32>,
/// The predicate to filter.
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<BoundPredicate>,
}

In order to properly process delete files as part of executing a scan task, executors will now need to load in any applicable delete files along with the data file that they are processing. I'll outline what happens now, and follow that by my proposed approach.

Current TableScan Synopsis

The current structure pushes all manifest file entries from the manifest list into a stream which we then process concurrently in order to retrieve their associated manifests. Once retrieved, each manifest then has each of it's manifest entries extracted and pushed onto a channel so that they can be processed in parallel. Each is embedded inside a context object that contains the relevant information that is needed for processing of the manifest entry. Tokio tasks listening to the channel then execute TableScan::process_manifest_entry on these objects, where we filter out any entries that do not match the scan filter predicate.
At this point, a FileScanTask is created for each of those entries that match the scan predicate. The FileScanTasks are then pushed into a channel that produces the stream of FileScanTasks that is returned to the original caller of plan_files.

Changes to TableScan

FileScanTask

Each FileScanTask represents a scan to be performed on a single data file. However, multiple delete files may need to be applied to any one data file. Additionally, the scope of applicability of delete files is any data file within the same partition of the delete file - i.e. the same delete file can need to be applied to multiple data files. Thus an executor needs to know not just the data file that it is processing, but all of the delete files that are applicable to that data file.

The first part of the set of changes that I'm proposing is refactor FileScanTask so that it represents a single data file and zero or more delete files.

  • The data_file_content property would be removed - each task is implicitly about a file of type Data.
  • A new struct, DeleteFileEntry, would be added. It would look something like this:
    struct DeleteFileEntry {
        path: String,
        format: DataFileFormat
    }
  • A delete_files property of typ Vec<DeleteFileEntry> would be added to FileScanTask to represent the delete files that are applicable to it's data file.

TableScan::plan_files and associated methods

We need to update this logic in order to ensure that we can properly populate this new delete_files property. Each ManifestEntryContext will need the list delete files so that if the manifest entry that it encapsulates passes the filtering steps, it can populate the new delete_files property when it constructs FileScanTask.

A naive approach may be to simply build a list of all of the delete files referred to by the top-level manifest list and give references to this list to all ManifestEntryContexts so that, if any delete files are present then all of them are included in every FileScanTask. This would be a good first step - code that works inefficiently is better than code that does not work at all! It would also permit work to proceed on the execution side.

Improvements could then be made to refine this approach to filter out inapplicable delete files that goes into each FileScanTask's delete_files property.

How does this sound so far, @liurenjie1024, @Xuanwo, @ZENOTME, @Fokko?

@xxhZs
Copy link

xxhZs commented Sep 14, 2024

Hi, I've recently implemented merge on read in my library using iceberg rust and submitted a working simplified version of the code, which looks somewhat similar to the A naive approach version you proposed! (I have to read the same delete file on my different nodes)

This pr is #625

About this issue. I have some doubts.
About FileScanTask {DeleteFileEntry}. as you said, the delete file and data file are many-to-many, so even if list delete file is saved in the file task, in the optimal case, the call still needs some special operations to make sure that all data file and delete file are dispatched to the same node, and that the delete file file is not read repeatedly. And most likely, this scheduling result is consistent with the partitioning result.
In this case, I prefer to expose the partitioning result directly in the file task. Please correct me if there is any misunderstanding

@sdd
Copy link
Contributor Author

sdd commented Sep 18, 2024

I'm happy to add the partitioning result to the task. This is useful to the executor node when deciding how to distribute tasks, as it enables the use of a few different strategies, the choice of which can be left to the implementer.

It is not necessarily the case that the delete file is read repeatedly if the delete file list is added to the file scan task, since we can store the parsed delete files inside the object cache, preventing them from being read repeatedly on the same node as they'd already be in memory. If the executor ensures that all tasks with the same partition get sent to the same executor, then the files would only be read once.

@liurenjie1024
Copy link
Contributor

Thanks @sdd for raising this. The general approach looks good to me. Challenging part of deletion file processing is to filter unnecessary deletion files in each task, which we can introduce as optimization later.

@sdd
Copy link
Contributor Author

sdd commented Sep 26, 2024

Thanks - I have some skeleton code for the required changes to reader.rs that I'm going to share over the next few days as well.

@sdd
Copy link
Contributor Author

sdd commented Sep 27, 2024

Thanks for taking a look at the above, @liurenjie1024. I've just submitted a draft PR which outlines the second part of the approach - how we extend the filtering in the arrow reader to handle delete files. #652

@liurenjie1024, @Xuanwo, @ZENOTME, @xxhZs: if you could take a look at that PR also when you get chance and let me know if you think that the approach seems sensible, that would be great!

@sdd
Copy link
Contributor Author

sdd commented Feb 7, 2025

Hi all. I'm resurrecting this issue now that @Fokko has kindly helped get the first part of this over the line by reviewing and merging #652.

I have a branch with an earlier iteration of delete file read support that I'm intending to break up into pieces and submit as separate PRs. There are parts of it that I'm happy with and other parts that I'm less happy with; plus now would be a good opportunity to discuss the higher-level structure of the approach for this again now that I've got a better idea of the different parts of work that are involved.

Outline

TableScan

  • TableScan::plan_files creates a DeleteFileIndex which accumulates an index of all of the equality delete files and positional delete files that could apply to data files within the scan. * Each of the FileScanTasks in the stream returned by plan_files is augmented with delete_files, a Vec<FileScanTaskDeleteFile> which contains details of the delete files that could apply to the data file in the FileScanTask.

ArrowReader

Right now, ArrowReader::read takes a stream of FileScanTasks, and for each task, it reads record batches from that task's data file - applies filtering, projection, batching and transformations such as schema migration / column reordering - returning a stream of the RecordBatches resulting from applying these operations to all file scan tasks.

In order to support delete files the filtering logic needs to be extended so that:

  • Equality delete files that are referenced in each task's delete_files list are used to filter out rows that match filter predicates in any applicable delete files;
  • Positional delete files from delete_files are used to filter out rows whose index is specified as deleted in applicable delete files.

Filtering Approach

  • Predicate based filtering is already being performed in the reader, by transforming the filter predicate present in the FileScanTask into an arrow RowFilter that gets passed to the ParquetRecordBatchStreamBuilder::with_row_filter.
  • Equality deletes can be handled by merging the file scan task's filter predicate with predicates built from applicable rows present in any equality delete files to form a single larger predicate that gets passed to ParquetRecordBatchStreamBuilder as before.
  • Any positional deletes can be handled by using the RowSelection logic - creating a RowSelection specific to the applicable positional deletes, and merging this with any RowSelection created as part of the row selection filter generated from the filter predicate, if present, and passed to ParquetRecordBatchStreamBuilder::with_row_selection.

Loading and Processing Delete Files

Whilst I was fairly happy with the approach taken to filtering in my first draft, the approach taken to loading and processing delete files felt like it could be improved.

The first draft took this approach:

// delete_file.rs

// Represents a parsed Delete file that can be safely stored
// in the Object Cache.
// Storing this in the object cache saves us from parsing
// the same file multiple times in the case of delete files that could apply to
// multiple files within the stream (It would be better if we also
// have a way of preventing more than one task from starting to parse the same
// delete file in parallel)
pub(crate) enum Deletes {
    // Positional delete files are parsed into a map of
    // filename to a sorted list of row indices.
    // ( I'm ignoring the stored rows that are present in
    //   positional deletes for now. I think they only used for statistics?)
    Positional(HashMap<String, Vec<u64>>),

    // Equality delete files are initially parsed solely as an
    // unprocessed list of `RecordBatch`es from the equality
    // delete files.
    // Can we do better than this by storing a `Predicate`?
    // The equality deletes refer to fields by name rather than field_id,
    // so if we cache a Predicate rather than just a raw `RecordBatch`
    // then a field name change would invalidate the cached Predicate.
    // Similarly, I don't think we can cache these as `BoundPredicate`s
    // as the column order could be different across different data
    // files and so the accessor in the bound predicate could be invalid)?
    Equality(Vec<RecordBatch>),
}

// Processes the RecordBatches of a pos del file into a `HashMap` that
// maps filenames to lists of row indices that are marked as 
// deleted for that file
pub(crate) async fn parse_positional_delete_file(
    mut record_batch_stream: ArrowRecordBatchStream,
) -> Result<Deletes> {
   // ...
}

// Simpler - collects the RecordBatches of an eq del file
// into a `Vec<RecordBatch>`
// and returns them in a `Deletes::Equality`
pub(crate) async fn parse_equality_delete_file(
    mut record_batch_stream: ArrowRecordBatchStream,
) -> Result<Deletes> {
  // ...
}
// arrow/reader.rs
impl ArrowReader {

    // ... rest of ArrowReader impl

    // Spawned at the start of `process_file_scan_task` and then awaited on
    // after the record_batch_stream_builder has been created and the page
    // index loaded.
    // Responsible for loading the delete files through FileIO and parsing
    // them into a list of `Deletes` objects
    async fn get_deletes(
        delete_file_entries: Vec<FileScanTaskDeleteFile>,
        file_io: FileIO,
        concurrency_limit_data_files: usize,
    ) -> Result<Vec<Deletes>> {
        // concurrently physically loads delete files for each
        // `FileScanTaskDeleteFile` via `FileIO` into a 
        // `RecordBatchStream`, which gets mapped through 
        // `parse_positional_delete_file` or `parse_equality_delete_file`
        // into a combined vec of `Deletes`
    }

    // Maps `Deletes::Positional` objects into a list
    // of indices of rows that need deleting
    // from the data file with the specified path
    fn get_positional_delete_indexes(
        data_file_path: &str,
        deletes: &[Deletes],
    ) -> Option<Vec<usize>> {
        // trivial filter / map
    }

    // Maps applicable `Deletes::Equality` objects into
    // an Iceberg predicate that is bound to the provided snapshot
    fn get_equality_deletes(
        delete_files: &[Deletes],
        snapshot_schema: SchemaRef,
    ) -> Result<Option<BoundPredicate>> {
        // starts with `AlwaysFalse` and builds up a predicate
        // by `AND`ing together predicates derived from applicable
        // rows in the RecordBatches
    }
}

@liurenjie1024 commented that this approach could be improved by reusing the logic present in the data file reader rather than reimplementing similar logic. He also mentioned that inspiration could be taken from the Java implementation's GenericReader.

Suggestions that improve upon the approach to loading and caching the delete files above are welcome. I'll refrain from submitting any changes related to this until others have had chance to comment. I'll proceed with the more well-defined aspects relating to orchestrating the filtering logic itself.

@sdd
Copy link
Contributor Author

sdd commented Feb 8, 2025

OK, I have an improved design for loading of delete files in the read pgase that I'll share shortly.

We introduce a DeleteFileManager, constructed when ArrowReader gets built and provided with a FileIO. Reader keeps an Arc of this that it clones and passes to process_file_scan_task. process_file_scan_task calls an async method of DeleteFileManager, passing in the delete file list for its file scan task.

DeleteFileManager loads and processes the delete files, deduplicating between multiple file scan tasks that reference the same delete files.

DeleteFileManager exposes two methods that process_file_scan_task calls later on - one to retrieve the list of positional delete row indices that apply to a specified data file, and another to get a filter predicate derived from the applicable delete files.

@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 11, 2025

Thanks for this great job! @sdd Should we also consider the case that enum Deletes occupy too much space so we need to support spilling it into disk?

@sdd
Copy link
Contributor Author

sdd commented Feb 19, 2025

@ZENOTME I think that we'll want to do that at some point but it feels more of a day 2 task. We're not touching the disk anywhere in the library so far, as far as I know, and so it would need some careful consideration.

@sdd
Copy link
Contributor Author

sdd commented Feb 23, 2025

I've worked on an improved design for loading and parsing of delete files by the DeleteFileManager. The code for this can be seen in #982.

  • Create a single stream of all delete file tasks irrespective of type, so that we can respect the combined concurrency limit
  • We then process each in two phases: load and parse.
  • for positional deletes, the load phase instantiates an ArrowRecordBatchStream to stream the file contents out
  • for eq deletes, we first check if the EQ delete is already loaded or being loaded by another concurrently processing data file scan task. If it is, we return a future from this phase. If not, we create such a future the same equality delete file, and return an ArrowRecordBatchStream from the load phase as per the positional deletes - only this time it is accompanied by a one-shot channel sender that we will eventually use to resolve the shared future that we stored in the state.
  • In a future update to the DeleteFileManager that adds support for delete vectors, the load phase will return a PuffinReader for them.
  • The parse phase parses each record batch stream according to its associated data type. The result of this is a map of data file paths to delete vectors for the positional delete tasks (and in future for the delete vector tasks). For equality delete file tasks, this results in an unbound Predicate.
  • The unbound Predicates resulting from equality deletes are sent to their associated one-shot channel to store them in the right place in the delete file manager's state.
  • The results of all of these futures are awaited on in parallel with the specified level of concurrency and collected into a Vec. We then combine all of the delete vector maps that resulted from any positional delete or delete vector files into a single map and persist it in the state.
    Conceptually, the data flow is like this:
                                         FileScanTaskDeleteFile
                                                    |
           Already-loading EQ Delete                |             Everything Else
                    +---------------------------------------------------+
                    |                                                   |
           [get existing future]                         [load recordbatch stream / puffin]
        DeleteFileContext::InProgEqDel                           DeleteFileContext
                    |                                                   |
                    |                                                   |
                    |                     +-----------------------------+--------------------------+
                    |                   Pos Del           Del Vec (Not yet Implemented)         EQ Del
                    |                     |                             |                          |
                    |          [parse pos del stream]         [parse del vec puffin]       [parse eq del]
                    |  HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap>   (Predicate, Sender)
                    |                     |                             |                          |
                    |                     |                             |                 [persist to state]
                    |                     |                             |                          ()
                    |                     |                             |                          |
                    |                     +-----------------------------+--------------------------+
                    |                                                   |
                    |                                            [buffer unordered]
                    |                                                   |
                    |                                            [combine del vectors]
                    |                                      HashMap<String, RoaringTreeMap>
                    |                                                   |
                    |                                      [persist del vectors to state]
                    |                                                   ()
                    |                                                   |
                    +-------------------------+-------------------------+
                                              |
                                           [join!]

@sdd
Copy link
Contributor Author

sdd commented Mar 5, 2025

liurenjie1024 pushed a commit that referenced this issue Mar 20, 2025
…se in `ArrowReader` (#950)

Second part of delete file read support. See
#630.

This PR provides the basis for delete file support within `ArrowReader`.

`DeleteFileManager` is introduced, in skeleton form. Full implementation
of its behaviour will be submitted in follow-up PRs.

`DeleteFileManager` is responsible for loading and parsing positional
and equality delete files from `FileIO`. Once delete files for a task
have been loaded and parsed, `ArrowReader::process_file_scan_task` uses
the resulting `DeleteFileManager` in two places:

* `DeleteFileManager::get_delete_vector_for_task` is passed a data file
path and will return an ~`Option<Vec<usize>>`~ `Option<RoaringTreeMap>`
containing the indices of all rows that are positionally deleted in that
data file (or `None` if there are none)
* `DeleteFileManager::build_delete_predicate` is invoked with the schema
from the file scan task. It will return an `Option<BoundPredicate>`
representing the filter predicate derived from all of the applicable
equality deletes being transformed into predicates, logically joined
into a single predicate and then bound to the schema (or `None` if there
are no applicable equality deletes)


This PR integrates the skeleton of the `DeleteFileManager` into
`ArrowReader::process_file_scan_task`, extending the `RowFilter` and
`RowSelection` logic to take into account any `RowFilter` that results
from equality deletes and any `RowSelection` that results from
positional deletes.

## Updates:
* refactored `DeleteFileManager` so that
`get_positional_delete_indexes_for_data_file` returns a `RoaringTreemap`
rather than a `Vec<usize>`. This was based on @liurenjie1024's
recommendation in a comment on the v1 PR, and makes a lot of sense from
a performance perspective and made it easier to implement
`ArrowReader::build_deletes_row_selection` in the follow-up PR to this
one, #951
* `DeleteFileManager` is instantiated in the `ArrowReader` constructor
rather than per-scan-task, so that delete files that apply to more than
one task don't end up getting loaded and parsed twice

## Potential further enhancements:
* Go one step further and move loading of delete files, and parsing of
positional delete files, into `ObjectCache` to ensure that loading and
parsing of the same files persists across scans
liurenjie1024 added a commit that referenced this issue Apr 7, 2025
…` implementation (#951)

Third part of delete file read support. See
#630

**Builds on top of #950


`build_deletes_row_selection` computes a `RowSelection` from a
`RoaringTreemap` representing the indexes of rows in a data file that
have been marked as deleted by positional delete files that apply to the
data file being read (and, in the future, delete vectors).

The resulting `RowSelection` will be merged with a `RowSelection`
resulting from the scan's filter predicate (if present) and supplied to
the `ParquetRecordBatchStreamBuilder` so that deleted rows are omitted
from the `RecordBatchStream` returned by the reader.

NB: I encountered quite a few edge cases in this method and the logic is
quite complex. There is a good chance that a keen-eyed reviewer would be
able to conceive of an edge-case that I haven't covered.

---------

Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants