diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 9b8893d9..5296b3e4 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -14,7 +14,10 @@ use crate::{ error::Error, table::{ delete_all_table_files, - transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX}, + transaction::{ + operation::{DsnGroup, Operation as TableOperation}, + APPEND_INDEX, REPLACE_INDEX, + }, }, view::transaction::operation::Operation as ViewOperation, }; @@ -105,12 +108,17 @@ impl<'view> Transaction<'view> { if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, - data_files: old, - delete_files: _, + dsn_groups, additional_summary: old_lineage, } = operation { - old.extend_from_slice(&files); + match dsn_groups.last_mut() { + Some(g) => g.data_files.extend_from_slice(&files), + None => dsn_groups.push(DsnGroup { + data_files: files, + delete_files: vec![], + }), + }; *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -119,8 +127,10 @@ impl<'view> Transaction<'view> { } else { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), - data_files: files, - delete_files: Vec::new(), + dsn_groups: vec![DsnGroup { + data_files: files, + delete_files: vec![], + }], additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state, @@ -140,12 +150,17 @@ impl<'view> Transaction<'view> { if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, - data_files: _, - delete_files: old, + dsn_groups, additional_summary: old_lineage, } = operation { - old.extend_from_slice(&files); + match dsn_groups.last_mut() { + Some(g) => g.delete_files.extend_from_slice(&files), + None => dsn_groups.push(DsnGroup { + data_files: vec![], + delete_files: files, + }), + }; *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -154,8 +169,10 @@ impl<'view> Transaction<'view> { } else { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), - data_files: Vec::new(), - delete_files: files, + dsn_groups: vec![DsnGroup { + data_files: vec![], + delete_files: files, + }], additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state, diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 66654b0a..f0b6072a 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference}; use crate::table::transaction::append::append_summary; +pub use crate::table::transaction::operation::DsnGroup; use crate::{catalog::commit::CommitTable, error::Error, table::Table}; use self::operation::Operation; @@ -120,17 +121,24 @@ impl<'table> TableTransaction<'table> { let summary = append_summary(&files); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { - if let Operation::Append { - data_files: old, .. - } = operation - { - old.extend_from_slice(&files); + if let Operation::Append { dsn_groups, .. } = operation { + match dsn_groups.last_mut() { + Some(g) => g.data_files.extend_from_slice(&files), + None => dsn_groups.push(DsnGroup { + data_files: files, + delete_files: vec![], + }), + }; + } else { + panic!("Operation at APPEND_INDEX should be an Append"); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), - data_files: files, - delete_files: Vec::new(), + dsn_groups: vec![DsnGroup { + data_files: files, + delete_files: vec![], + }], additional_summary: summary, }); } @@ -159,23 +167,47 @@ impl<'table> TableTransaction<'table> { if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { branch: _, - data_files: _, - delete_files: old, - additional_summary: None, + dsn_groups, + .. } = operation { - old.extend_from_slice(&files); + match dsn_groups.last_mut() { + Some(g) => g.delete_files.extend_from_slice(&files), + None => dsn_groups.push(DsnGroup { + data_files: vec![], + delete_files: files, + }), + }; + } else { + panic!("Operation at APPEND_INDEX should be an Append"); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), - data_files: Vec::new(), - delete_files: files, + dsn_groups: vec![DsnGroup { + data_files: vec![], + delete_files: files, + }], additional_summary: None, }); } self } + /// Create a new data sequence number for subsequent appends + pub fn new_data_sequence_number(mut self) -> Self { + if let Some(Operation::Append { + branch: _, + ref mut dsn_groups, + .. + }) = self.operations[APPEND_INDEX] + { + dsn_groups.push(DsnGroup { + data_files: vec![], + delete_files: vec![], + }); + } + self + } /// Overwrites specific data files in the table with new ones /// /// This operation replaces specified existing data files with new ones, rather than diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index f904f1a0..63c35a2e 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -38,6 +38,15 @@ use super::append::split_datafiles; /// The target number of datafiles per manifest is dynamic, but we don't want to go below this number. static MIN_DATAFILES_PER_MANIFEST: usize = 4; +#[derive(Debug, Clone)] +/// Group of write sharing a Data Sequence Number +pub struct DsnGroup { + /// Delete files. These apply to insert files from previous Data Sequence Groups + pub delete_files: Vec, + /// Insert files + pub data_files: Vec, +} + #[derive(Debug)] ///Table operations pub enum Operation { @@ -56,8 +65,7 @@ pub enum Operation { /// Append new files to the table Append { branch: Option, - data_files: Vec, - delete_files: Vec, + dsn_groups: Vec, additional_summary: Option>, }, // /// Quickly append new files to the table @@ -99,8 +107,7 @@ impl Operation { match self { Operation::Append { branch, - data_files, - delete_files, + dsn_groups, additional_summary, } => { let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?; @@ -110,14 +117,31 @@ impl Operation { FormatVersion::V2 => manifest_list_schema_v2(), }; + let mut dsn_offset = 0; + let mut data_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![]; + let mut delete_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![]; + for dsn_group in dsn_groups.into_iter() { + if !dsn_group.data_files.is_empty() || !dsn_group.delete_files.is_empty() { + dsn_offset += 1; + for data_file in dsn_group.data_files.into_iter() { + data_files.push((data_file, dsn_offset)); + } + for delete_file in dsn_group.delete_files.into_iter() { + delete_files.push((delete_file, dsn_offset)); + } + } + } + let n_data_files = data_files.len(); let n_delete_files = delete_files.len(); if n_data_files + n_delete_files == 0 { return Ok((None, Vec::new())); } + let largest_dsn_offset = dsn_offset; + assert!(largest_dsn_offset >= 1, "Should have exited early"); - let data_files_iter = delete_files.iter().chain(data_files.iter()); + let data_files_iter = delete_files.iter().chain(data_files.iter()).map(|(x, _)| x); let mut manifest_list_writer = if let Some(manifest_list_bytes) = prefetch_manifest_list(old_snapshot, &object_store) @@ -143,25 +167,41 @@ impl Operation { let n_delete_splits = manifest_list_writer.n_splits(n_delete_files, Content::Deletes); - let new_datafile_iter = data_files.into_iter().map(|data_file| { - ManifestEntry::builder() + let new_datafile_iter = data_files.into_iter().map(|(data_file, dsn_offset)| { + let mut builder = ManifestEntry::builder(); + builder .with_format_version(table_metadata.format_version) .with_status(Status::Added) - .with_data_file(data_file) + .with_data_file(data_file); + // If there is only one data sequence number in this commit, we can just use sequence number inheritance + // If there are multiple data sequence numbers in this commit, we need to set the data sequence number on each manifest + if largest_dsn_offset > 1 { + builder + .with_sequence_number(table_metadata.last_sequence_number + dsn_offset); + } + builder .build() .map_err(crate::spec::error::Error::from) .map_err(Error::from) }); - let new_deletefile_iter = delete_files.into_iter().map(|data_file| { - ManifestEntry::builder() - .with_format_version(table_metadata.format_version) - .with_status(Status::Added) - .with_data_file(data_file) - .build() - .map_err(crate::spec::error::Error::from) - .map_err(Error::from) - }); + let new_deletefile_iter = + delete_files.into_iter().map(|(data_file, dsn_offset)| { + let mut builder = ManifestEntry::builder(); + builder + .with_format_version(table_metadata.format_version) + .with_status(Status::Added) + .with_data_file(data_file); + if largest_dsn_offset > 1 { + builder.with_sequence_number( + table_metadata.last_sequence_number + dsn_offset, + ); + } + builder + .build() + .map_err(crate::spec::error::Error::from) + .map_err(Error::from) + }); let snapshot_id = generate_snapshot_id(); @@ -211,11 +251,13 @@ impl Operation { (_, _) => Ok(SnapshotOperation::Overwrite), }?; + let snapshot_sequence_number = + table_metadata.last_sequence_number + largest_dsn_offset; let mut snapshot_builder = SnapshotBuilder::default(); snapshot_builder .with_snapshot_id(snapshot_id) .with_manifest_list(new_manifest_list_location) - .with_sequence_number(table_metadata.last_sequence_number + 1) + .with_sequence_number(snapshot_sequence_number) .with_summary(Summary { operation: snapshot_operation, other: additional_summary.unwrap_or_default(),