Skip to content

Commit 16a4a21

Browse files
Li0kxxchan
andauthored
feat: allow specify snapshot id for fast append (#25)(#14) (#73)
Signed-off-by: xxchan <xxchan22f@gmail.com> typo typo Co-authored-by: xxchan <xxchan22f@gmail.com>
1 parent 1535d08 commit 16a4a21

File tree

6 files changed

+77
-30
lines changed

6 files changed

+77
-30
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ use crate::error::Result;
2525
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
2626
use crate::table::Table;
2727
use crate::transaction::snapshot::{
28-
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
28+
generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
2929
};
3030
use crate::transaction::{ActionCommit, TransactionAction};
31+
use crate::{Error, ErrorKind};
3132

3233
/// FastAppendAction is a transaction action for fast append data files to the table.
3334
pub struct FastAppendAction {
@@ -38,6 +39,7 @@ pub struct FastAppendAction {
3839
snapshot_properties: HashMap<String, String>,
3940
added_data_files: Vec<DataFile>,
4041
added_delete_files: Vec<DataFile>,
42+
snapshot_id: Option<i64>,
4143
}
4244

4345
impl FastAppendAction {
@@ -49,6 +51,7 @@ impl FastAppendAction {
4951
snapshot_properties: HashMap::default(),
5052
added_data_files: vec![],
5153
added_delete_files: vec![],
54+
snapshot_id: None,
5255
}
5356
}
5457

@@ -90,18 +93,41 @@ impl FastAppendAction {
9093
self.snapshot_properties = snapshot_properties;
9194
self
9295
}
96+
97+
/// Set snapshot id
98+
pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self {
99+
self.snapshot_id = Some(snapshot_id);
100+
self
101+
}
93102
}
94103

95104
#[async_trait]
96105
impl TransactionAction for FastAppendAction {
97106
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
107+
let snapshot_id = if let Some(snapshot_id) = self.snapshot_id {
108+
if table
109+
.metadata()
110+
.snapshots()
111+
.any(|s| s.snapshot_id() == snapshot_id)
112+
{
113+
return Err(Error::new(
114+
ErrorKind::DataInvalid,
115+
format!("Snapshot id {} already exists", snapshot_id),
116+
));
117+
}
118+
snapshot_id
119+
} else {
120+
generate_unique_snapshot_id(table)
121+
};
122+
98123
let snapshot_producer = SnapshotProducer::new(
99124
table,
100125
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
101126
self.key_metadata.clone(),
102127
self.snapshot_properties.clone(),
103128
self.added_data_files.clone(),
104129
self.added_delete_files.clone(),
130+
snapshot_id,
105131
);
106132

107133
// validate added files

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,11 @@ impl<'a> SnapshotProducer<'a> {
9090
snapshot_properties: HashMap<String, String>,
9191
added_data_files: Vec<DataFile>,
9292
added_delete_files: Vec<DataFile>,
93+
snapshot_id: i64,
9394
) -> Self {
9495
Self {
9596
table,
96-
snapshot_id: Self::generate_unique_snapshot_id(table),
97+
snapshot_id,
9798
commit_uuid,
9899
key_metadata,
99100
snapshot_properties,
@@ -161,28 +162,6 @@ impl<'a> SnapshotProducer<'a> {
161162
Ok(())
162163
}
163164

164-
fn generate_unique_snapshot_id(table: &Table) -> i64 {
165-
let generate_random_id = || -> i64 {
166-
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
167-
let snapshot_id = (lhs ^ rhs) as i64;
168-
if snapshot_id < 0 {
169-
-snapshot_id
170-
} else {
171-
snapshot_id
172-
}
173-
};
174-
let mut snapshot_id = generate_random_id();
175-
176-
while table
177-
.metadata()
178-
.snapshots()
179-
.any(|s| s.snapshot_id() == snapshot_id)
180-
{
181-
snapshot_id = generate_random_id();
182-
}
183-
snapshot_id
184-
}
185-
186165
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
187166
let new_manifest_path = format!(
188167
"{}/{}/{}-m{}.{}",
@@ -474,3 +453,25 @@ impl<'a> SnapshotProducer<'a> {
474453
Ok(ActionCommit::new(updates, requirements))
475454
}
476455
}
456+
457+
pub fn generate_unique_snapshot_id(table: &Table) -> i64 {
458+
let generate_random_id = || -> i64 {
459+
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
460+
let snapshot_id = (lhs ^ rhs) as i64;
461+
if snapshot_id < 0 {
462+
-snapshot_id
463+
} else {
464+
snapshot_id
465+
}
466+
};
467+
let mut snapshot_id = generate_random_id();
468+
469+
while table
470+
.metadata()
471+
.snapshots()
472+
.any(|s| s.snapshot_id() == snapshot_id)
473+
{
474+
snapshot_id = generate_random_id();
475+
}
476+
snapshot_id
477+
}

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
2424
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2525
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
26-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2726
use iceberg::writer::file_writer::location_generator::{
2827
DefaultFileNameGenerator, DefaultLocationGenerator,
2928
};
29+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3030
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3131
use iceberg::{Catalog, TableCreation};
3232
use iceberg_catalog_rest::RestCatalog;
@@ -128,4 +128,24 @@ async fn test_append_data_file() {
128128
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
129129
assert_eq!(batches.len(), 1);
130130
assert_eq!(batches[0], batch);
131+
132+
// commit result again
133+
let tx = Transaction::new(&table);
134+
let append_action = tx.fast_append().add_data_files(data_file.clone());
135+
let tx = append_action.apply(tx).unwrap();
136+
let table = tx.commit(&rest_catalog).await.unwrap();
137+
138+
// check result again
139+
let batch_stream = table
140+
.scan()
141+
.select_all()
142+
.build()
143+
.unwrap()
144+
.to_arrow()
145+
.await
146+
.unwrap();
147+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
148+
assert_eq!(batches.len(), 2);
149+
assert_eq!(batches[0], batch);
150+
assert_eq!(batches[1], batch);
131151
}

crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartiti
2525
use iceberg::table::Table;
2626
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2727
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2928
use iceberg::writer::file_writer::location_generator::{
3029
DefaultFileNameGenerator, DefaultLocationGenerator,
3130
};
31+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3232
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3333
use iceberg::{Catalog, TableCreation};
3434
use iceberg_catalog_rest::RestCatalog;

crates/integration_tests/tests/shared_tests/conflict_commit_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
2424
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2525
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
26-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2726
use iceberg::writer::file_writer::location_generator::{
2827
DefaultFileNameGenerator, DefaultLocationGenerator,
2928
};
29+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3030
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3131
use iceberg::{Catalog, TableCreation};
3232
use iceberg_catalog_rest::RestCatalog;

crates/integration_tests/tests/shared_tests/scan_all_type.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ use arrow_schema::{DataType, Field, Fields};
3030
use futures::TryStreamExt;
3131
use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE};
3232
use iceberg::spec::{
33-
LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField,
34-
PrimitiveType, Schema, StructType, Type,
33+
ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, LIST_FIELD_NAME,
34+
MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME,
3535
};
3636
use iceberg::transaction::{ApplyTransactionAction, Transaction};
3737
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
38-
use iceberg::writer::file_writer::ParquetWriterBuilder;
3938
use iceberg::writer::file_writer::location_generator::{
4039
DefaultFileNameGenerator, DefaultLocationGenerator,
4140
};
41+
use iceberg::writer::file_writer::ParquetWriterBuilder;
4242
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
4343
use iceberg::{Catalog, TableCreation};
4444
use iceberg_catalog_rest::RestCatalog;

0 commit comments

Comments
 (0)