Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions iceberg-rust/src/materialized_view/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::{
error::Error,
table::{
delete_all_table_files,
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
transaction::{
operation::{DsnGroup, Operation as TableOperation},
APPEND_INDEX, REPLACE_INDEX,
},
},
view::transaction::operation::Operation as ViewOperation,
};
Expand Down Expand Up @@ -105,12 +108,17 @@ impl<'view> Transaction<'view> {
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
data_files: old,
delete_files: _,
dsn_groups,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
match dsn_groups.last_mut() {
Some(g) => g.data_files.extend_from_slice(&files),
None => dsn_groups.push(DsnGroup {
data_files: files,
delete_files: vec![],
}),
};
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
Expand All @@ -119,8 +127,10 @@ impl<'view> Transaction<'view> {
} else {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: files,
delete_files: Vec::new(),
dsn_groups: vec![DsnGroup {
data_files: files,
delete_files: vec![],
}],
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
Expand All @@ -140,12 +150,17 @@ impl<'view> Transaction<'view> {
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
data_files: _,
delete_files: old,
dsn_groups,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
match dsn_groups.last_mut() {
Some(g) => g.delete_files.extend_from_slice(&files),
None => dsn_groups.push(DsnGroup {
data_files: vec![],
delete_files: files,
}),
};
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
Expand All @@ -154,8 +169,10 @@ impl<'view> Transaction<'view> {
} else {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
dsn_groups: vec![DsnGroup {
data_files: vec![],
delete_files: files,
}],
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
Expand Down
58 changes: 45 additions & 13 deletions iceberg-rust/src/table/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};

use crate::table::transaction::append::append_summary;
pub use crate::table::transaction::operation::DsnGroup;
use crate::{catalog::commit::CommitTable, error::Error, table::Table};

use self::operation::Operation;
Expand Down Expand Up @@ -120,17 +121,24 @@ impl<'table> TableTransaction<'table> {
let summary = append_summary(&files);

if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
data_files: old, ..
} = operation
{
old.extend_from_slice(&files);
if let Operation::Append { dsn_groups, .. } = operation {
match dsn_groups.last_mut() {
Some(g) => g.data_files.extend_from_slice(&files),
None => dsn_groups.push(DsnGroup {
data_files: files,
delete_files: vec![],
}),
};
} else {
panic!("Operation at APPEND_INDEX should be an Append");
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: files,
delete_files: Vec::new(),
dsn_groups: vec![DsnGroup {
data_files: files,
delete_files: vec![],
}],
additional_summary: summary,
});
}
Expand Down Expand Up @@ -159,23 +167,47 @@ impl<'table> TableTransaction<'table> {
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
branch: _,
data_files: _,
delete_files: old,
additional_summary: None,
dsn_groups,
..
} = operation
{
old.extend_from_slice(&files);
match dsn_groups.last_mut() {
Some(g) => g.delete_files.extend_from_slice(&files),
None => dsn_groups.push(DsnGroup {
data_files: vec![],
delete_files: files,
}),
};
} else {
panic!("Operation at APPEND_INDEX should be an Append");
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
dsn_groups: vec![DsnGroup {
data_files: vec![],
delete_files: files,
}],
additional_summary: None,
});
}
self
}
/// Create a new data sequence number for subsequent appends
pub fn new_data_sequence_number(mut self) -> Self {
if let Some(Operation::Append {
branch: _,
ref mut dsn_groups,
..
}) = self.operations[APPEND_INDEX]
{
dsn_groups.push(DsnGroup {
data_files: vec![],
delete_files: vec![],
});
}
self
}
/// Overwrites specific data files in the table with new ones
///
/// This operation replaces specified existing data files with new ones, rather than
Expand Down
78 changes: 60 additions & 18 deletions iceberg-rust/src/table/transaction/operation.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw there will be a (minor) conflict here with JanKaul#227 or JanKaul#228, depending on which one gets merged.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ use super::append::split_datafiles;
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
static MIN_DATAFILES_PER_MANIFEST: usize = 4;

#[derive(Debug, Clone)]
/// Group of write sharing a Data Sequence Number
pub struct DsnGroup {
/// Delete files. These apply to insert files from previous Data Sequence Groups
pub delete_files: Vec<DataFile>,
/// Insert files
pub data_files: Vec<DataFile>,
}

#[derive(Debug)]
///Table operations
pub enum Operation {
Expand All @@ -56,8 +65,7 @@ pub enum Operation {
/// Append new files to the table
Append {
branch: Option<String>,
data_files: Vec<DataFile>,
delete_files: Vec<DataFile>,
dsn_groups: Vec<DsnGroup>,
additional_summary: Option<HashMap<String, String>>,
},
// /// Quickly append new files to the table
Expand Down Expand Up @@ -99,8 +107,7 @@ impl Operation {
match self {
Operation::Append {
branch,
data_files,
delete_files,
dsn_groups,
additional_summary,
} => {
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
Expand All @@ -110,14 +117,31 @@ impl Operation {
FormatVersion::V2 => manifest_list_schema_v2(),
};

let mut dsn_offset = 0;
let mut data_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
let mut delete_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
for dsn_group in dsn_groups.into_iter() {
if !dsn_group.data_files.is_empty() || !dsn_group.delete_files.is_empty() {
dsn_offset += 1;
for data_file in dsn_group.data_files.into_iter() {
data_files.push((data_file, dsn_offset));
}
for delete_file in dsn_group.delete_files.into_iter() {
delete_files.push((delete_file, dsn_offset));
}
}
}

let n_data_files = data_files.len();
let n_delete_files = delete_files.len();

if n_data_files + n_delete_files == 0 {
return Ok((None, Vec::new()));
}
let largest_dsn_offset = dsn_offset;
assert!(largest_dsn_offset >= 1, "Should have exited early");

let data_files_iter = delete_files.iter().chain(data_files.iter());
let data_files_iter = delete_files.iter().chain(data_files.iter()).map(|(x, _)| x);

let mut manifest_list_writer = if let Some(manifest_list_bytes) =
prefetch_manifest_list(old_snapshot, &object_store)
Expand All @@ -143,25 +167,41 @@ impl Operation {
let n_delete_splits =
manifest_list_writer.n_splits(n_delete_files, Content::Deletes);

let new_datafile_iter = data_files.into_iter().map(|data_file| {
ManifestEntry::builder()
let new_datafile_iter = data_files.into_iter().map(|(data_file, dsn_offset)| {
let mut builder = ManifestEntry::builder();
builder
.with_format_version(table_metadata.format_version)
.with_status(Status::Added)
.with_data_file(data_file)
.with_data_file(data_file);
// If there is only one data sequence number in this commit, we can just use sequence number inheritance
// If there are multiple data sequence numbers in this commit, we need to set the data sequence number on each manifest
if largest_dsn_offset > 1 {
builder
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset);
}
builder
.build()
.map_err(crate::spec::error::Error::from)
.map_err(Error::from)
});

let new_deletefile_iter = delete_files.into_iter().map(|data_file| {
ManifestEntry::builder()
.with_format_version(table_metadata.format_version)
.with_status(Status::Added)
.with_data_file(data_file)
.build()
.map_err(crate::spec::error::Error::from)
.map_err(Error::from)
});
let new_deletefile_iter =
delete_files.into_iter().map(|(data_file, dsn_offset)| {
let mut builder = ManifestEntry::builder();
builder
.with_format_version(table_metadata.format_version)
.with_status(Status::Added)
.with_data_file(data_file);
if largest_dsn_offset > 1 {
builder.with_sequence_number(
table_metadata.last_sequence_number + dsn_offset,
);
}
builder
.build()
.map_err(crate::spec::error::Error::from)
.map_err(Error::from)
});

let snapshot_id = generate_snapshot_id();

Expand Down Expand Up @@ -211,11 +251,13 @@ impl Operation {
(_, _) => Ok(SnapshotOperation::Overwrite),
}?;

let snapshot_sequence_number =
table_metadata.last_sequence_number + largest_dsn_offset;
let mut snapshot_builder = SnapshotBuilder::default();
snapshot_builder
.with_snapshot_id(snapshot_id)
.with_manifest_list(new_manifest_list_location)
.with_sequence_number(table_metadata.last_sequence_number + 1)
.with_sequence_number(snapshot_sequence_number)
.with_summary(Summary {
operation: snapshot_operation,
other: additional_summary.unwrap_or_default(),
Expand Down