Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct FastAppendAction {
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
}

impl FastAppendAction {
Expand All @@ -47,6 +48,7 @@ impl FastAppendAction {
key_metadata: None,
snapshot_properties: HashMap::default(),
added_data_files: vec![],
added_delete_files: vec![],
}
}

Expand All @@ -58,7 +60,16 @@ impl FastAppendAction {

/// Add data files to the snapshot.
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
self.added_data_files.extend(data_files);
for file in data_files {
match file.content_type() {
crate::spec::DataContentType::Data => self.added_data_files.push(file),
crate::spec::DataContentType::PositionDeletes
| crate::spec::DataContentType::EqualityDeletes => {
self.added_delete_files.push(file)
}
}
}

self
}

Expand Down Expand Up @@ -90,16 +101,22 @@ impl TransactionAction for FastAppendAction {
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
self.added_delete_files.clone(),
);

// validate added files
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
snapshot_producer.validate_added_data_files(&self.added_delete_files)?;

// Checks duplicate files
if self.check_duplicate {
snapshot_producer
.validate_duplicate_files(&self.added_data_files)
.await?;

snapshot_producer
.validate_duplicate_files(&self.added_delete_files)
.await?;
}

snapshot_producer
Expand Down Expand Up @@ -152,7 +169,7 @@ mod tests {
use std::sync::Arc;

use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH,
};
use crate::transaction::tests::make_v2_minimal_table;
use crate::transaction::{Transaction, TransactionAction};
Expand Down
63 changes: 44 additions & 19 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use uuid::Uuid;

use crate::error::Result;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation,
update_snapshot_summaries, DataContentType, DataFile, DataFileFormat, FormatVersion,
ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter,
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH,
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType,
Summary, update_snapshot_summaries,
};
use crate::table::Table;
use crate::transaction::ActionCommit;
Expand Down Expand Up @@ -74,7 +74,8 @@ pub(crate) struct SnapshotProducer<'a> {
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
pub added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
Expand All @@ -88,6 +89,7 @@ impl<'a> SnapshotProducer<'a> {
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
) -> Self {
Self {
table,
Expand All @@ -96,18 +98,13 @@ impl<'a> SnapshotProducer<'a> {
key_metadata,
snapshot_properties,
added_data_files,
added_delete_files,
manifest_counter: (0..),
}
}

pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> {
for data_file in added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
// Check if the data file partition spec id matches the table default partition spec id.
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
return Err(Error::new(
Expand Down Expand Up @@ -249,18 +246,42 @@ impl<'a> SnapshotProducer<'a> {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
if added_data_files.is_empty() {
async fn write_added_manifest(&mut self, added_files: Vec<DataFile>) -> Result<ManifestFile> {
if added_files.is_empty() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"No added data files found when write an added manifest file",
));
}

let file_count = added_files.len();

let manifest_content_type = {
let mut data_num = 0;
let mut delete_num = 0;
for f in &added_files {
match f.content_type() {
DataContentType::Data => data_num += 1,
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
delete_num += 1
}
}
}
if data_num == file_count {
ManifestContentType::Data
} else if delete_num == file_count {
ManifestContentType::Deletes
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
"added DataFile for a ManifestFile should be same type (Data or Delete)",
));
}
};

let snapshot_id = self.snapshot_id;
let format_version = self.table.metadata().format_version();
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let manifest_entries = added_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
.data_file(data_file);
Expand All @@ -272,7 +293,7 @@ impl<'a> SnapshotProducer<'a> {
builder.build()
}
});
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
let mut writer = self.new_manifest_writer(manifest_content_type)?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
Expand Down Expand Up @@ -301,12 +322,16 @@ impl<'a> SnapshotProducer<'a> {

// Process added entries.
if !self.added_data_files.is_empty() {
let added_manifest = self.write_added_manifest().await?;
let added_data_files = std::mem::take(&mut self.added_data_files);
let added_manifest = self.write_added_manifest(added_data_files).await?;
manifest_files.push(added_manifest);
}

// # TODO
// Support process delete entries.
if !self.added_delete_files.is_empty() {
let added_delete_files = std::mem::take(&mut self.added_delete_files);
let added_manifest = self.write_added_manifest(added_delete_files).await?;
manifest_files.push(added_manifest);
}

let manifest_files = manifest_process.process_manifests(self, manifest_files);
Ok(manifest_files)
Expand Down
Loading