Skip to content

Commit 59e52f6

Browse files
committed
- fix test
- refine comment
1 parent af5c4e7 commit 59e52f6

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

crates/iceberg/src/transaction/merge_append.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl TransactionAction for MergeAppendAction {
135135

136136
if self.merge_enabled {
137137
snapshot_producer
138-
.commit(AppendOperation, MergeManifsetProcess {
138+
.commit(AppendOperation, MergeManifestProcess {
139139
target_size_bytes: self.target_size_bytes,
140140
min_count_to_merge: self.min_count_to_merge,
141141
})
@@ -271,6 +271,10 @@ impl MergeManifestManager {
271271
Ok(merged_bins.into_iter().flatten().collect())
272272
}
273273

274+
// Merge Algorithm:
275+
// 1. Split manifests into groups by partition spec id.
276+
// 2. For each group, pack manifests into bins by target size, the sum of manifest length in each bin should be less than target size.
277+
// 3. For the bin contains the first manifest, if the number of manifests in the bin is less than min count, then don't merge it. Otherwise, merge the bin.
274278
async fn merge_manifest(
275279
&self,
276280
snapshot_produce: &mut SnapshotProducer<'_>,
@@ -296,12 +300,12 @@ impl MergeManifestManager {
296300
}
297301
}
298302

299-
struct MergeManifsetProcess {
303+
struct MergeManifestProcess {
300304
target_size_bytes: u32,
301305
min_count_to_merge: u32,
302306
}
303307

304-
impl ManifestProcess for MergeManifsetProcess {
308+
impl ManifestProcess for MergeManifestProcess {
305309
async fn process_manifests(
306310
&self,
307311
snapshot_produce: &mut SnapshotProducer<'_>,

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,17 @@ impl<'a> SnapshotProducer<'a> {
299299
));
300300
}
301301

302-
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
303-
let mut manifest_files = existing_manifests;
304-
305-
// Process added entries.
302+
// # NOTE
303+
// The order of manifest files is matter:
304+
// [added_manifest, ... ]
305+
// # TODO
306+
// Should we use type safe way to guarantee this order?
307+
let mut manifest_files = vec![];
306308
if !self.added_data_files.is_empty() {
307309
let added_manifest = self.write_added_manifest().await?;
308310
manifest_files.push(added_manifest);
309311
}
312+
manifest_files.extend(snapshot_produce_operation.existing_manifest(self).await?);
310313

311314
// # TODO
312315
// Support process delete entries.

crates/integration_tests/tests/shared_tests/merge_append_test.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,7 @@ async fn test_append_data_file() {
111111
.await
112112
.unwrap();
113113

114-
// Enable merge append for table
115-
let tx = Transaction::new(&table);
116-
let update_properties_action = tx
117-
.update_table_properties()
118-
.set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string())
119-
.set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string())
120-
.set(MANIFEST_TARGET_SIZE_BYTES.to_string(), "8000".to_string());
121-
let tx = update_properties_action.apply(tx).unwrap();
122-
table = tx.commit(&rest_catalog).await.unwrap();
123-
124114
// fast append data file 3 time to create 3 manifest
125-
let mut original_manifest_entries = vec![];
126115
for _ in 0..3 {
127116
let data_file = write_new_data_file(&table).await;
128117
let tx = Transaction::new(&table);
@@ -138,15 +127,31 @@ async fn test_append_data_file() {
138127
.await
139128
.unwrap();
140129
assert_eq!(manifest_list.entries().len(), 3);
130+
131+
// Set the merge size to make sure per two manifest will be packed together.
132+
let manifest_file_len = manifest_list.entries().iter().map(|entry| entry.manifest_length).max().unwrap();
133+
let tx = Transaction::new(&table);
134+
let update_properties_action = tx
135+
.update_table_properties()
136+
.set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string())
137+
.set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string())
138+
.set(MANIFEST_TARGET_SIZE_BYTES.to_string(), (manifest_file_len * 2 + 2).to_string());
139+
let tx = update_properties_action.apply(tx).unwrap();
140+
table = tx.commit(&rest_catalog).await.unwrap();
141141

142-
// construct test data
142+
// Test case:
143+
// There are 3 manifset, and we append one manifest with merge append.
144+
// Target size is 2 * manifest_file_len + 1, so each two manifest will be packed together.
145+
// The first bin contains the first manifest and the first additional manifest, but it's count is less than min merge count, so it will not be merged.
146+
// Other two manifests will be merged into one manifest.
147+
// Expect three manifest in the new snapshot:
148+
// 1. The new manifest with new added data file.
149+
// 2. Original manifest with one original data file.
150+
// 3. The merged manifest with two original data files.
151+
let mut original_manifest_entries = vec![];
143152
for (idx, entry) in manifest_list.entries().iter().enumerate() {
144153
let manifest = entry.load_manifest(table.file_io()).await.unwrap();
145154
assert!(manifest.entries().len() == 1);
146-
147-
// For this first manifest, it will be pack with the first additional manifest and
148-
// the count(2) is less than the min merge count(4), so these two will not merge.
149-
// See detail: `MergeManifestProcess::merge_group`
150155
if idx == 0 {
151156
original_manifest_entries.push(Arc::new(
152157
ManifestEntry::builder()
@@ -192,7 +197,6 @@ async fn test_append_data_file() {
192197
.unwrap();
193198
assert!(manifest.entries().len() == 1);
194199
original_manifest_entries.retain(|entry| !manifest.entries().contains(entry));
195-
assert!(original_manifest_entries.len() == 2);
196200
}
197201
{
198202
let manifest = manifest_list.entries()[2]

0 commit comments

Comments
 (0)