Skip to content

Commit ac756a4

Browse files
authored
Scan Delete Support Part 2: introduce DeleteFileManager skeleton. Use in ArrowReader (#950)
Second part of delete file read support. See #630. This PR provides the basis for delete file support within `ArrowReader`. `DeleteFileManager` is introduced, in skeleton form. Full implementation of its behaviour will be submitted in follow-up PRs. `DeleteFileManager` is responsible for loading and parsing positional and equality delete files from `FileIO`. Once delete files for a task have been loaded and parsed, `ArrowReader::process_file_scan_task` uses the resulting `DeleteFileManager` in two places: * `DeleteFileManager::get_delete_vector_for_task` is passed a data file path and will return an ~`Option<Vec<usize>>`~ `Option<RoaringTreeMap>` containing the indices of all rows that are positionally deleted in that data file (or `None` if there are none) * `DeleteFileManager::build_delete_predicate` is invoked with the schema from the file scan task. It will return an `Option<BoundPredicate>` representing the filter predicate derived from all of the applicable equality deletes being transformed into predicates, logically joined into a single predicate and then bound to the schema (or `None` if there are no applicable equality deletes) This PR integrates the skeleton of the `DeleteFileManager` into `ArrowReader::process_file_scan_task`, extending the `RowFilter` and `RowSelection` logic to take into account any `RowFilter` that results from equality deletes and any `RowSelection` that results from positional deletes. ## Updates: * refactored `DeleteFileManager` so that `get_positional_delete_indexes_for_data_file` returns a `RoaringTreemap` rather than a `Vec<usize>`. This was based on @liurenjie1024's recommendation in a comment on the v1 PR, and makes a lot of sense from a performance perspective and made it easier to implement `ArrowReader::build_deletes_row_selection` in the follow-up PR to this one, #951 * `DeleteFileManager` is instantiated in the `ArrowReader` constructor rather than per-scan-task, so that delete files that apply to more than one task don't end up getting loaded and parsed twice ## Potential further enhancements: * Go one step further and move loading of delete files, and parsing of positional delete files, into `ObjectCache` to ensure that loading and parsing of the same files persists across scans
1 parent f0295f8 commit ac756a4

File tree

9 files changed

+342
-44
lines changed

9 files changed

+342
-44
lines changed

Diff for: Cargo.lock

+17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ port_scanner = "0.1.5"
8686
rand = "0.8.5"
8787
regex = "1.10.5"
8888
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
89+
roaring = "0.10"
8990
rust_decimal = "1.31"
9091
serde = { version = "1.0.204", features = ["rc"] }
9192
serde_bytes = "0.11.15"

Diff for: crates/iceberg/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ parquet = { workspace = true, features = ["async"] }
7272
paste = { workspace = true }
7373
rand = { workspace = true }
7474
reqwest = { workspace = true }
75+
roaring = { workspace = true }
7576
rust_decimal = { workspace = true }
7677
serde = { workspace = true }
7778
serde_bytes = { workspace = true }

Diff for: crates/iceberg/src/arrow/delete_file_manager.rs

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::delete_vector::DeleteVector;
19+
use crate::expr::BoundPredicate;
20+
use crate::io::FileIO;
21+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
22+
use crate::spec::SchemaRef;
23+
use crate::{Error, ErrorKind, Result};
24+
25+
#[allow(unused)]
26+
pub trait DeleteFileManager {
27+
/// Read the delete file referred to in the task
28+
///
29+
/// Returns the raw contents of the delete file as a RecordBatch stream
30+
fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream>;
31+
}
32+
33+
#[allow(unused)]
34+
#[derive(Clone, Debug)]
35+
pub(crate) struct CachingDeleteFileManager {
36+
file_io: FileIO,
37+
concurrency_limit_data_files: usize,
38+
}
39+
40+
impl DeleteFileManager for CachingDeleteFileManager {
41+
fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream> {
42+
// TODO, implementation in https://github.com/apache/iceberg-rust/pull/982
43+
44+
Err(Error::new(
45+
ErrorKind::FeatureUnsupported,
46+
"Reading delete files is not yet supported",
47+
))
48+
}
49+
}
50+
51+
#[allow(unused_variables)]
52+
impl CachingDeleteFileManager {
53+
pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager {
54+
Self {
55+
file_io,
56+
concurrency_limit_data_files,
57+
}
58+
}
59+
60+
pub(crate) async fn load_deletes(
61+
&self,
62+
delete_file_entries: Vec<FileScanTaskDeleteFile>,
63+
) -> Result<()> {
64+
// TODO
65+
66+
if !delete_file_entries.is_empty() {
67+
Err(Error::new(
68+
ErrorKind::FeatureUnsupported,
69+
"Reading delete files is not yet supported",
70+
))
71+
} else {
72+
Ok(())
73+
}
74+
}
75+
76+
pub(crate) fn build_delete_predicate(
77+
&self,
78+
snapshot_schema: SchemaRef,
79+
) -> Result<Option<BoundPredicate>> {
80+
// TODO
81+
82+
Ok(None)
83+
}
84+
85+
pub(crate) fn get_positional_delete_indexes_for_data_file(
86+
&self,
87+
data_file_path: &str,
88+
) -> Option<DeleteVector> {
89+
// TODO
90+
91+
None
92+
}
93+
}

Diff for: crates/iceberg/src/arrow/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
mod schema;
2121
pub use schema::*;
22+
pub(crate) mod delete_file_manager;
2223
mod reader;
2324
pub(crate) mod record_batch_projector;
2425
pub(crate) mod record_batch_transformer;

0 commit comments

Comments
 (0)