Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/manifest/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ impl ManifestEntry {
self.snapshot_id
}

/// File sequence number
#[inline]
pub fn file_sequence_number(&self) -> Option<i64> {
self.file_sequence_number
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In merge_append_test.rs

/// Data sequence number.
#[inline]
pub fn sequence_number(&self) -> Option<i64> {
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ impl TransactionAction for FastAppendAction {
}

snapshot_producer
.commit(FastAppendOperation, DefaultManifestProcess)
.commit(AppendOperation, DefaultManifestProcess)
.await
}
}

struct FastAppendOperation;
pub(crate) struct AppendOperation;

impl SnapshotProduceOperation for FastAppendOperation {
impl SnapshotProduceOperation for AppendOperation {
fn operation(&self) -> Operation {
Operation::Append
}
Expand Down
330 changes: 330 additions & 0 deletions crates/iceberg/src/transaction/merge_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::{BTreeMap, HashMap};
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use uuid::Uuid;

use crate::Result;
use crate::io::FileIO;
use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter};
use crate::table::Table;
use crate::transaction::append::AppendOperation;
use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer};
use crate::transaction::{ActionCommit, TransactionAction};
use crate::utils::bin::ListPacker;
use crate::utils::parse_property;

/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
/// based on the target size.
pub struct MergeAppendAction {
target_size_bytes: u32,
min_count_to_merge: u32,
check_duplicate: bool,
merge_enabled: bool,
// below are properties used to create SnapshotProducer when commit
commit_uuid: Option<Uuid>,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
}

/// Target size of manifest file when merging manifests.
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
/// Minimum number of manifests to merge.
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
/// Whether allow to merge manifests.
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;

impl MergeAppendAction {
pub(crate) fn new(table: &Table) -> Result<Self> {
let target_size_bytes: u32 = parse_property(
table.metadata().properties(),
MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
)?;
let min_count_to_merge: u32 = parse_property(
table.metadata().properties(),
MANIFEST_MIN_MERGE_COUNT,
MANIFEST_MIN_MERGE_COUNT_DEFAULT,
)?;
let merge_enabled = parse_property(
table.metadata().properties(),
MANIFEST_MERGE_ENABLED,
MANIFEST_MERGE_ENABLED_DEFAULT,
)?;
Ok(Self {
check_duplicate: true,
target_size_bytes,
min_count_to_merge,
merge_enabled,
commit_uuid: None,
key_metadata: None,
snapshot_properties: HashMap::default(),
added_data_files: vec![],
})
}

pub fn with_check_duplicate(mut self, v: bool) -> Self {
self.check_duplicate = v;
self
}

pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
self.commit_uuid = Some(commit_uuid);
self
}

pub fn set_key_metadata(mut self, key_metadata: Vec<u8>) -> Self {
self.key_metadata = Some(key_metadata);
self
}

pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
self.snapshot_properties = snapshot_properties;
self
}

/// Add data files to the snapshot.
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
self.added_data_files.extend(data_files);
self
}
}

#[async_trait]
impl TransactionAction for MergeAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
);

// validate added files
snapshot_producer.validate_added_data_files(&self.added_data_files)?;

// Checks duplicate files
if self.check_duplicate {
snapshot_producer
.validate_duplicate_files(&self.added_data_files)
.await?;
}

if self.merge_enabled {
snapshot_producer
.commit(AppendOperation, MergeManifestProcess {
target_size_bytes: self.target_size_bytes,
min_count_to_merge: self.min_count_to_merge,
})
.await
} else {
snapshot_producer
.commit(AppendOperation, DefaultManifestProcess)
.await
}
}
}

struct MergeManifestManager {
target_size_bytes: u32,
min_count_to_merge: u32,
content: ManifestContentType,
}

impl MergeManifestManager {
pub fn new(
target_size_bytes: u32,
min_count_to_merge: u32,
content: ManifestContentType,
) -> Self {
Self {
target_size_bytes,
min_count_to_merge,
content,
}
}

fn group_by_spec(&self, manifests: Vec<ManifestFile>) -> BTreeMap<i32, Vec<ManifestFile>> {
let mut grouped_manifests = BTreeMap::new();
for manifest in manifests {
grouped_manifests
.entry(manifest.partition_spec_id)
.or_insert_with(Vec::new)
.push(manifest);
}
grouped_manifests
}

async fn merge_bin(
&self,
snapshot_id: i64,
file_io: FileIO,
manifest_bin: Vec<ManifestFile>,
mut writer: ManifestWriter,
) -> Result<ManifestFile> {
for manifest_file in manifest_bin {
let manifest_file = manifest_file.load_manifest(&file_io).await?;
for manifest_entry in manifest_file.entries() {
if manifest_entry.status() == ManifestStatus::Deleted
&& manifest_entry
.snapshot_id()
.is_some_and(|id| id == snapshot_id)
{
//only files deleted by this snapshot should be added to the new manifest
writer.add_delete_entry(manifest_entry.as_ref().clone())?;
} else if manifest_entry.status() == ManifestStatus::Added
&& manifest_entry
.snapshot_id()
.is_some_and(|id| id == snapshot_id)
{
//added entries from this snapshot are still added, otherwise they should be existing
writer.add_entry(manifest_entry.as_ref().clone())?;
} else if manifest_entry.status() != ManifestStatus::Deleted {
// add all non-deleted files from the old manifest as existing files
writer.add_existing_entry(manifest_entry.as_ref().clone())?;
}
}
}

writer.write_manifest_file().await
}

async fn merge_group(
&self,
snapshot_produce: &mut SnapshotProducer<'_>,
first_manifest: &ManifestFile,
group_manifests: Vec<ManifestFile>,
) -> Result<Vec<ManifestFile>> {
let packer: ListPacker<ManifestFile> = ListPacker::new(self.target_size_bytes);
let manifest_bins =
packer.pack(group_manifests, |manifest| manifest.manifest_length as u32);

let manifest_merge_futures = manifest_bins
.map(|manifest_bin| {
if manifest_bin.len() == 1 {
Ok(Box::pin(async { Ok(manifest_bin) })
as Pin<
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
>)
}
// if the bin has the first manifest (the new data files or an appended manifest file) then only
// merge it if the number of manifests is above the minimum count. this is applied only to bins
// with an in-memory manifest so that large manifests don't prevent merging older groups.
else if manifest_bin
.iter()
.any(|manifest| manifest == first_manifest)
&& manifest_bin.len() < self.min_count_to_merge as usize
{
Ok(Box::pin(async { Ok(manifest_bin) })
as Pin<
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
>)
} else {
let writer = snapshot_produce.new_manifest_writer(self.content)?;
let snapshot_id = snapshot_produce.snapshot_id;
let file_io = snapshot_produce.table.file_io().clone();
Ok((Box::pin(async move {
Ok(vec![
self.merge_bin(
snapshot_id,
file_io,
manifest_bin,
writer,
)
.await?,
])
}))
as Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>)
}
})
.collect::<Result<Vec<Pin<Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>>>>>()?;

let merged_bins: Vec<Vec<ManifestFile>> =
futures::future::join_all(manifest_merge_futures.into_iter())
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(merged_bins.into_iter().flatten().collect())
}

// Merge Algorithm:
// 1. Split manifests into groups by partition spec id.
// 2. For each group, pack manifests into bins by target size, the sum of manifest length in each bin should be less than target size.
// 3. For the bin contains the first manifest, if the number of manifests in the bin is less than min count, then don't merge it. Otherwise, merge the bin.
async fn merge_manifest(
&self,
snapshot_produce: &mut SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Result<Vec<ManifestFile>> {
if manifests.is_empty() {
return Ok(manifests);
}

let first_manifest = manifests[0].clone();

let group_manifests = self.group_by_spec(manifests);

let mut merge_manifests = vec![];
for (_spec_id, manifests) in group_manifests.into_iter().rev() {
merge_manifests.extend(
self.merge_group(snapshot_produce, &first_manifest, manifests)
.await?,
);
}

Ok(merge_manifests)
}
}

struct MergeManifestProcess {
target_size_bytes: u32,
min_count_to_merge: u32,
}

impl ManifestProcess for MergeManifestProcess {
async fn process_manifests(
&self,
snapshot_produce: &mut SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Result<Vec<ManifestFile>> {
let (unmerg_data_manifests, unmerge_delete_manifest) = manifests
.into_iter()
.partition(|m| m.content == ManifestContentType::Data);
let mut data_manifests = {
let merge_manifest_manager = MergeManifestManager::new(
self.target_size_bytes,
self.min_count_to_merge,
ManifestContentType::Data,
);
merge_manifest_manager
.merge_manifest(snapshot_produce, unmerg_data_manifests)
.await?
};
data_manifests.extend(unmerge_delete_manifest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm a bit confused while reading this, could you please help me understand:

  • why do we care about delete_manifest? I think MergeAppend should not introduce delete manifest but maybe I'm wrong
  • Why should we leave delete_manifest unmerged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we care about delete_manifest? I think MergeAppend should not introduce delete manifest but maybe I'm wrong

Yes, we will not process delete manifest. In here, we:

  1. fitler out all data manifest and merge them
  2. keep delete manifest but not process them (We can't drop them)
  3. concat processed data manifest and delete manifest and return them

Why should we leave delete_manifest unmerged?

This implementation refer from pyiceberg: https://github.com/apache/iceberg-python/blob/e9c025318787bfd34b98a3fc41544e0f168904ba/pyiceberg/table/update/snapshot.py#L553. I guess the reason is that in MergeAppend we only append data file, so we assume the delete file will not change so we don't need to process them.

But I notice that here is different from iceberg-java. In iceberg-java, merge append action use MergeSnapshotProducer. And MergeSnapshotProducer will merge both data manifest and delete manifest.

Ok(data_manifests)
}
}
10 changes: 10 additions & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use std::collections::HashMap;

pub use action::*;
mod append;
mod merge_append;
mod snapshot;
mod sort_order;
mod update_location;
Expand All @@ -78,6 +79,10 @@ use crate::spec::{
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
use crate::transaction::merge_append::MergeAppendAction;
pub use crate::transaction::merge_append::{
MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES,
};
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
Expand Down Expand Up @@ -148,6 +153,11 @@ impl Transaction {
FastAppendAction::new()
}

/// Creates a merge append action.
pub fn merge_append(&self) -> Result<MergeAppendAction> {
MergeAppendAction::new(&self.table)
}

/// Creates replace sort order action.
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
ReplaceSortOrderAction::new()
Expand Down
Loading
Loading