Skip to content

Commit cc00f8e

Browse files
committed
add merge append action
1 parent b6a6f6d commit cc00f8e

File tree

8 files changed

+720
-12
lines changed

8 files changed

+720
-12
lines changed

crates/iceberg/src/spec/manifest/entry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ impl ManifestEntry {
130130
self.snapshot_id
131131
}
132132

133+
/// File sequence number
134+
#[inline]
135+
pub fn file_sequence_number(&self) -> Option<i64> {
136+
self.file_sequence_number
137+
}
138+
133139
/// Data sequence number.
134140
#[inline]
135141
pub fn sequence_number(&self) -> Option<i64> {

crates/iceberg/src/transaction/append.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl TransactionAction for FastAppendAction {
108108
}
109109
}
110110

111-
struct FastAppendOperation;
111+
pub(crate) struct FastAppendOperation;
112112

113113
impl SnapshotProduceOperation for FastAppendOperation {
114114
fn operation(&self) -> Operation {
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::{BTreeMap, HashMap};
19+
use std::pin::Pin;
20+
use std::sync::Arc;
21+
22+
use async_trait::async_trait;
23+
use uuid::Uuid;
24+
25+
use crate::Result;
26+
use crate::io::FileIO;
27+
use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter};
28+
use crate::table::Table;
29+
use crate::transaction::append::FastAppendOperation;
30+
use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer};
31+
use crate::transaction::{ActionCommit, TransactionAction};
32+
use crate::utils::bin::ListPacker;
33+
34+
/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
35+
/// based on the target size.
36+
pub struct MergeAppendAction {
37+
target_size_bytes: u32,
38+
min_count_to_merge: u32,
39+
check_duplicate: bool,
40+
merge_enabled: bool,
41+
// below are properties used to create SnapshotProducer when commit
42+
commit_uuid: Option<Uuid>,
43+
key_metadata: Option<Vec<u8>>,
44+
snapshot_properties: HashMap<String, String>,
45+
added_data_files: Vec<DataFile>,
46+
}
47+
48+
/// Target size of manifest file when merging manifests.
49+
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
50+
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
51+
/// Minimum number of manifests to merge.
52+
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
53+
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
54+
/// Whether allow to merge manifests.
55+
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
56+
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
57+
58+
impl MergeAppendAction {
59+
pub(crate) fn new(table: &Table) -> Result<Self> {
60+
let target_size_bytes: u32 = table
61+
.metadata()
62+
.properties()
63+
.get(MANIFEST_TARGET_SIZE_BYTES)
64+
.and_then(|s| s.parse().ok())
65+
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
66+
let min_count_to_merge: u32 = table
67+
.metadata()
68+
.properties()
69+
.get(MANIFEST_MIN_MERGE_COUNT)
70+
.and_then(|s| s.parse().ok())
71+
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
72+
let merge_enabled = table
73+
.metadata()
74+
.properties()
75+
.get(MANIFEST_MERGE_ENABLED)
76+
.and_then(|s| s.parse().ok())
77+
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
78+
Ok(Self {
79+
check_duplicate: true,
80+
target_size_bytes,
81+
min_count_to_merge,
82+
merge_enabled,
83+
commit_uuid: None,
84+
key_metadata: None,
85+
snapshot_properties: HashMap::default(),
86+
added_data_files: vec![],
87+
})
88+
}
89+
90+
pub fn with_check_duplicate(mut self, v: bool) -> Self {
91+
self.check_duplicate = v;
92+
self
93+
}
94+
95+
pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
96+
self.commit_uuid = Some(commit_uuid);
97+
self
98+
}
99+
100+
pub fn set_key_metadata(mut self, key_metadata: Vec<u8>) -> Self {
101+
self.key_metadata = Some(key_metadata);
102+
self
103+
}
104+
105+
pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
106+
self.snapshot_properties = snapshot_properties;
107+
self
108+
}
109+
110+
/// Add data files to the snapshot.
111+
pub fn add_data_files(
112+
&mut self,
113+
data_files: impl IntoIterator<Item = DataFile>,
114+
) -> Result<&mut Self> {
115+
self.added_data_files.extend(data_files);
116+
Ok(self)
117+
}
118+
}
119+
120+
#[async_trait]
121+
impl TransactionAction for MergeAppendAction {
122+
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
123+
let snapshot_producer = SnapshotProducer::new(
124+
table,
125+
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
126+
self.key_metadata.clone(),
127+
self.snapshot_properties.clone(),
128+
self.added_data_files.clone(),
129+
);
130+
131+
// validate added files
132+
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
133+
134+
// Checks duplicate files
135+
if self.check_duplicate {
136+
snapshot_producer
137+
.validate_duplicate_files(&self.added_data_files)
138+
.await?;
139+
}
140+
141+
if self.merge_enabled {
142+
snapshot_producer
143+
.commit(FastAppendOperation, MergeManifsetProcess {
144+
target_size_bytes: self.target_size_bytes,
145+
min_count_to_merge: self.min_count_to_merge,
146+
})
147+
.await
148+
} else {
149+
snapshot_producer
150+
.commit(FastAppendOperation, DefaultManifestProcess)
151+
.await
152+
}
153+
}
154+
}
155+
156+
struct MergeManifestManager {
157+
target_size_bytes: u32,
158+
min_count_to_merge: u32,
159+
content: ManifestContentType,
160+
}
161+
162+
impl MergeManifestManager {
163+
pub fn new(
164+
target_size_bytes: u32,
165+
min_count_to_merge: u32,
166+
content: ManifestContentType,
167+
) -> Self {
168+
Self {
169+
target_size_bytes,
170+
min_count_to_merge,
171+
content,
172+
}
173+
}
174+
175+
fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> {
176+
let mut grouped_manifests = BTreeMap::new();
177+
for manifest in manifests {
178+
grouped_manifests
179+
.entry(manifest.partition_spec_id)
180+
.or_insert_with(Vec::new)
181+
.push(manifest);
182+
}
183+
grouped_manifests
184+
}
185+
186+
async fn merge_bin(
187+
&self,
188+
snapshot_id: i64,
189+
file_io: FileIO,
190+
manifest_bin: Vec<ManifestFile>,
191+
mut writer: ManifestWriter,
192+
) -> Result<ManifestFile> {
193+
for manifest_file in manifest_bin {
194+
let manifest_file = manifest_file.load_manifest(&file_io).await?;
195+
for manifest_entry in manifest_file.entries() {
196+
if manifest_entry.status() == ManifestStatus::Deleted
197+
&& manifest_entry
198+
.snapshot_id()
199+
.is_some_and(|id| id == snapshot_id)
200+
{
201+
//only files deleted by this snapshot should be added to the new manifest
202+
writer.add_delete_entry(manifest_entry.as_ref().clone())?;
203+
} else if manifest_entry.status() == ManifestStatus::Added
204+
&& manifest_entry
205+
.snapshot_id()
206+
.is_some_and(|id| id == snapshot_id)
207+
{
208+
//added entries from this snapshot are still added, otherwise they should be existing
209+
writer.add_entry(manifest_entry.as_ref().clone())?;
210+
} else if manifest_entry.status() != ManifestStatus::Deleted {
211+
// add all non-deleted files from the old manifest as existing files
212+
writer.add_existing_entry(manifest_entry.as_ref().clone())?;
213+
}
214+
}
215+
}
216+
217+
writer.write_manifest_file().await
218+
}
219+
220+
async fn merge_group(
221+
&self,
222+
snapshot_produce: &mut SnapshotProducer<'_>,
223+
first_manifest: &ManifestFile,
224+
group_manifests: Vec<ManifestFile>,
225+
) -> Result<Vec<ManifestFile>> {
226+
let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes);
227+
let manifest_bins =
228+
packer.pack(group_manifests, |manifest| manifest.manifest_length as u32);
229+
230+
let manifest_merge_futures = manifest_bins
231+
.into_iter()
232+
.map(|manifest_bin| {
233+
if manifest_bin.len() == 1 {
234+
Ok(Box::pin(async { Ok(manifest_bin) })
235+
as Pin<
236+
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
237+
>)
238+
}
239+
// if the bin has the first manifest (the new data files or an appended manifest file) then only
240+
// merge it if the number of manifests is above the minimum count. this is applied only to bins
241+
// with an in-memory manifest so that large manifests don't prevent merging older groups.
242+
else if manifest_bin
243+
.iter()
244+
.any(|manifest| manifest == first_manifest)
245+
&& manifest_bin.len() < self.min_count_to_merge as usize
246+
{
247+
Ok(Box::pin(async { Ok(manifest_bin) })
248+
as Pin<
249+
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
250+
>)
251+
} else {
252+
let writer = snapshot_produce.new_manifest_writer(self.content)?;
253+
let snapshot_id = snapshot_produce.snapshot_id;
254+
let file_io = snapshot_produce.table.file_io().clone();
255+
Ok((Box::pin(async move {
256+
Ok(vec![
257+
self.merge_bin(
258+
snapshot_id,
259+
file_io,
260+
manifest_bin,
261+
writer,
262+
)
263+
.await?,
264+
])
265+
}))
266+
as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>)
267+
}
268+
})
269+
.collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?;
270+
271+
let merged_bins: Vec<Vec<ManifestFile>> =
272+
futures::future::join_all(manifest_merge_futures.into_iter())
273+
.await
274+
.into_iter()
275+
.collect::<Result<Vec<_>>>()?;
276+
277+
Ok(merged_bins.into_iter().flatten().collect())
278+
}
279+
280+
async fn merge_manifest(
281+
&self,
282+
snapshot_produce: &mut SnapshotProducer<'_>,
283+
manifests: Vec<ManifestFile>,
284+
) -> Result<Vec<ManifestFile>> {
285+
if manifests.is_empty() {
286+
return Ok(manifests);
287+
}
288+
289+
let first_manifest = manifests[0].clone();
290+
291+
let group_manifests = self.group_by_spec(manifests);
292+
293+
let mut merge_manifests = vec![];
294+
for (_spec_id, manifests) in group_manifests.into_iter().rev() {
295+
merge_manifests.extend(
296+
self.merge_group(snapshot_produce, &first_manifest, manifests)
297+
.await?,
298+
);
299+
}
300+
301+
Ok(merge_manifests)
302+
}
303+
}
304+
305+
struct MergeManifsetProcess {
306+
target_size_bytes: u32,
307+
min_count_to_merge: u32,
308+
}
309+
310+
impl ManifestProcess for MergeManifsetProcess {
311+
async fn process_manifests(
312+
&self,
313+
snapshot_produce: &mut SnapshotProducer<'_>,
314+
manifests: Vec<ManifestFile>,
315+
) -> Result<Vec<ManifestFile>> {
316+
let (unmerg_data_manifests, unmerge_delete_manifest) = manifests
317+
.into_iter()
318+
.partition(|m| m.content == ManifestContentType::Data);
319+
let mut data_manifests = {
320+
let merge_manifest_manager = MergeManifestManager::new(
321+
self.target_size_bytes,
322+
self.min_count_to_merge,
323+
ManifestContentType::Data,
324+
);
325+
merge_manifest_manager
326+
.merge_manifest(snapshot_produce, unmerg_data_manifests)
327+
.await?
328+
};
329+
data_manifests.extend(unmerge_delete_manifest);
330+
Ok(data_manifests)
331+
}
332+
}

crates/iceberg/src/transaction/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
mod action;
5454
pub use action::*;
5555
mod append;
56+
mod merge_append;
5657
mod snapshot;
5758
mod sort_order;
5859
mod update_location;
@@ -66,6 +67,10 @@ use crate::error::Result;
6667
use crate::table::Table;
6768
use crate::transaction::action::BoxedTransactionAction;
6869
use crate::transaction::append::FastAppendAction;
70+
use crate::transaction::merge_append::MergeAppendAction;
71+
pub use crate::transaction::merge_append::{
72+
MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES,
73+
};
6974
use crate::transaction::sort_order::ReplaceSortOrderAction;
7075
use crate::transaction::update_location::UpdateLocationAction;
7176
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -136,6 +141,11 @@ impl Transaction {
136141
FastAppendAction::new()
137142
}
138143

144+
/// Creates a merge append action.
145+
pub fn merge_append(&self) -> Result<MergeAppendAction> {
146+
MergeAppendAction::new(&self.table)
147+
}
148+
139149
/// Creates replace sort order action.
140150
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
141151
ReplaceSortOrderAction::new()

0 commit comments

Comments
 (0)