From 24b637bbf4efa98369d84592c33b7b43b51f9383 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 30 Jun 2025 23:29:32 +0800 Subject: [PATCH] add new_manifest_writer in SnapshotProducer --- crates/iceberg/src/transaction/snapshot.rs | 48 ++++++++++++---------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 16b94b088c..113e6a7673 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -22,12 +22,12 @@ use std::ops::RangeFrom; use uuid::Uuid; use crate::error::Result; -use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, + DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, + Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, + Summary, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -177,7 +177,11 @@ impl SnapshotProducer { snapshot_id } - fn new_manifest_output(&mut self, table: &Table) -> Result { + fn new_manifest_writer( + &mut self, + content: ManifestContentType, + table: &Table, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", table.metadata().location(), @@ -186,7 +190,22 @@ impl SnapshotProducer { self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - table.file_io().new_output(new_manifest_path) + let output_file = table.file_io().new_output(new_manifest_path)?; + let builder = ManifestWriterBuilder::new( + output_file, + Some(self.snapshot_id), + self.key_metadata.clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), + ); + if table.metadata().format_version() == FormatVersion::V1 { + Ok(builder.build_v1()) + } else { + match content { + ManifestContentType::Data => Ok(builder.build_v2_data()), + ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), + } + } } // Check if the partition value is compatible with the partition type. @@ -244,20 +263,7 @@ impl SnapshotProducer { builder.build() } }); - let mut writer = { - let builder = ManifestWriterBuilder::new( - self.new_manifest_output(table)?, - Some(self.snapshot_id), - self.key_metadata.clone(), - table.metadata().current_schema().clone(), - table.metadata().default_partition_spec().as_ref().clone(), - ); - if table.metadata().format_version() == FormatVersion::V1 { - builder.build_v1() - } else { - builder.build_v2_data() - } - }; + let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?; for entry in manifest_entries { writer.add_entry(entry)?; }