Skip to content

Commit f4d3827

Browse files
authored
feat: optimize plan files memory consumption (#64)
* optimize file scan task deletes by arc * fmt * fix * fmt * fmt
1 parent 1d4b1d7 commit f4d3827

File tree

2 files changed

+22
-21
lines changed

2 files changed

+22
-21
lines changed

crates/iceberg/src/delete_file_index.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ pub(crate) struct DeleteFileIndex {
3434
ready_notify: Arc<Notify>,
3535
}
3636

37+
type DeleteFileContextAndTask = (Arc<DeleteFileContext>, Arc<FileScanTask>);
38+
3739
#[derive(Debug)]
3840
struct PopulatedDeleteFileIndex {
3941
#[allow(dead_code)]
40-
global_deletes: Vec<Arc<DeleteFileContext>>,
41-
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
42-
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
42+
global_deletes: Vec<DeleteFileContextAndTask>,
43+
eq_deletes_by_partition: HashMap<Struct, Vec<DeleteFileContextAndTask>>,
44+
pos_deletes_by_partition: HashMap<Struct, Vec<DeleteFileContextAndTask>>,
4345
// TODO: do we need this?
4446
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
4547

@@ -84,7 +86,7 @@ impl DeleteFileIndex {
8486
&self,
8587
data_file: &DataFile,
8688
seq_num: Option<i64>,
87-
) -> Result<Vec<FileScanTask>> {
89+
) -> Result<Vec<Arc<FileScanTask>>> {
8890
match self.index.get() {
8991
Some(idx) => Ok(idx.get_deletes_for_data_file(data_file, seq_num)),
9092
None => {
@@ -109,15 +111,14 @@ impl PopulatedDeleteFileIndex {
109111
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
110112
111113
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
112-
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
113-
HashMap::default();
114-
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
115-
HashMap::default();
114+
let mut eq_deletes_by_partition = HashMap::default();
115+
let mut pos_deletes_by_partition = HashMap::default();
116116

117-
let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
117+
let mut global_deletes: Vec<(Arc<DeleteFileContext>, Arc<FileScanTask>)> = vec![];
118118

119119
files.into_iter().for_each(|ctx| {
120120
let arc_ctx = Arc::new(ctx);
121+
let file_scan_task: Arc<FileScanTask> = Arc::new(arc_ctx.as_ref().into());
121122

122123
let partition = arc_ctx.manifest_entry.data_file().partition();
123124

@@ -126,7 +127,7 @@ impl PopulatedDeleteFileIndex {
126127
// TODO: confirm we're good to skip here if we encounter a pos del
127128
// FIXME(Dylan): allow putting position delete to global deletes.
128129
// if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
129-
global_deletes.push(arc_ctx);
130+
global_deletes.push((arc_ctx, file_scan_task.clone()));
130131
return;
131132
// }
132133
}
@@ -139,10 +140,10 @@ impl PopulatedDeleteFileIndex {
139140

140141
destination_map
141142
.entry(partition.clone())
142-
.and_modify(|entry| {
143-
entry.push(arc_ctx.clone());
143+
.and_modify(|entry: &mut Vec<DeleteFileContextAndTask>| {
144+
entry.push((arc_ctx.clone(), file_scan_task.clone()));
144145
})
145-
.or_insert(vec![arc_ctx.clone()]);
146+
.or_insert(vec![(arc_ctx.clone(), file_scan_task)]);
146147
});
147148

148149
PopulatedDeleteFileIndex {
@@ -157,29 +158,29 @@ impl PopulatedDeleteFileIndex {
157158
&self,
158159
data_file: &DataFile,
159160
seq_num: Option<i64>,
160-
) -> Vec<FileScanTask> {
161+
) -> Vec<Arc<FileScanTask>> {
161162
let mut results = vec![];
162163

163164
self.global_deletes
164165
.iter()
165166
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
166-
.filter(|&delete| {
167+
.filter(|&(delete, _)| {
167168
seq_num
168169
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
169170
.unwrap_or_else(|| true)
170171
})
171-
.for_each(|delete| results.push(delete.as_ref().into()));
172+
.for_each(|(_, task)| results.push(task.clone()));
172173

173174
if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
174175
deletes
175176
.iter()
176177
// filter that returns true if the provided delete file's sequence number is **greater than** `seq_num`
177-
.filter(|&delete| {
178+
.filter(|&(delete, _)| {
178179
seq_num
179180
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
180181
.unwrap_or_else(|| true)
181182
})
182-
.for_each(|delete| results.push(delete.as_ref().into()));
183+
.for_each(|(_, task)| results.push(task.clone()));
183184
}
184185

185186
// TODO: the spec states that:
@@ -190,12 +191,12 @@ impl PopulatedDeleteFileIndex {
190191
deletes
191192
.iter()
192193
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
193-
.filter(|&delete| {
194+
.filter(|&(delete, _)| {
194195
seq_num
195196
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
196197
.unwrap_or_else(|| true)
197198
})
198-
.for_each(|delete| results.push(delete.as_ref().into()));
199+
.for_each(|(_, task)| results.push(task.clone()));
199200
}
200201

201202
results

crates/iceberg/src/scan/task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct FileScanTask {
5858
pub predicate: Option<BoundPredicate>,
5959

6060
/// The list of delete files that may need to be applied to this data file
61-
pub deletes: Vec<FileScanTask>,
61+
pub deletes: Vec<Arc<FileScanTask>>,
6262
/// sequence number
6363
pub sequence_number: i64,
6464
/// equality ids

0 commit comments

Comments
 (0)