Skip to content

Commit 16ce618

Browse files
committed
feat(iceberg): rewrite files action (#47)
* feat(iceberg): introduce rewrite files action * fix(iceberg): add test * fix test
1 parent fefc6cb commit 16ce618

File tree

8 files changed

+758
-55
lines changed

8 files changed

+758
-55
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tokio::sync::OnceCell;
3737
use typed_builder::TypedBuilder;
3838

3939
use crate::client::{
40-
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
40+
deserialize_catalog_response, deserialize_unexpected_catalog_error, HttpClient,
4141
};
4242
use crate::types::{
4343
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
@@ -619,7 +619,7 @@ impl Catalog for RestCatalog {
619619
.config
620620
.unwrap_or_default()
621621
.into_iter()
622-
.chain(self.user_config.props.clone().into_iter())
622+
.chain(self.user_config.props.clone())
623623
.collect();
624624

625625
let file_io = self
@@ -670,7 +670,7 @@ impl Catalog for RestCatalog {
670670
.config
671671
.unwrap_or_default()
672672
.into_iter()
673-
.chain(self.user_config.props.clone().into_iter())
673+
.chain(self.user_config.props.clone())
674674
.collect();
675675

676676
let file_io = self
@@ -1600,12 +1600,10 @@ mod tests {
16001600

16011601
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
16021602

1603-
assert!(
1604-
catalog
1605-
.namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1606-
.await
1607-
.unwrap()
1608-
);
1603+
assert!(catalog
1604+
.namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1605+
.await
1606+
.unwrap());
16091607

16101608
config_mock.assert_async().await;
16111609
get_ns_mock.assert_async().await;
@@ -1922,15 +1920,13 @@ mod tests {
19221920

19231921
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
19241922

1925-
assert!(
1926-
catalog
1927-
.table_exists(&TableIdent::new(
1928-
NamespaceIdent::new("ns1".to_string()),
1929-
"table1".to_string(),
1930-
))
1931-
.await
1932-
.unwrap()
1933-
);
1923+
assert!(catalog
1924+
.table_exists(&TableIdent::new(
1925+
NamespaceIdent::new("ns1".to_string()),
1926+
"table1".to_string(),
1927+
))
1928+
.await
1929+
.unwrap());
19341930

19351931
config_mock.assert_async().await;
19361932
check_table_exists_mock.assert_async().await;
@@ -2147,13 +2143,11 @@ mod tests {
21472143
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
21482144
.partition_spec(
21492145
UnboundPartitionSpec::builder()
2150-
.add_partition_fields(vec![
2151-
UnboundPartitionField::builder()
2152-
.source_id(1)
2153-
.transform(Transform::Truncate(3))
2154-
.name("id".to_string())
2155-
.build(),
2156-
])
2146+
.add_partition_fields(vec![UnboundPartitionField::builder()
2147+
.source_id(1)
2148+
.transform(Transform::Truncate(3))
2149+
.name("id".to_string())
2150+
.build()])
21572151
.unwrap()
21582152
.build(),
21592153
)
@@ -2298,13 +2292,11 @@ mod tests {
22982292
.await;
22992293

23002294
assert!(table_result.is_err());
2301-
assert!(
2302-
table_result
2303-
.err()
2304-
.unwrap()
2305-
.message()
2306-
.contains("already exists")
2307-
);
2295+
assert!(table_result
2296+
.err()
2297+
.unwrap()
2298+
.message()
2299+
.contains("already exists"));
23082300

23092301
config_mock.assert_async().await;
23102302
create_table_mock.assert_async().await;
@@ -2509,13 +2501,11 @@ mod tests {
25092501
.await;
25102502

25112503
assert!(table_result.is_err());
2512-
assert!(
2513-
table_result
2514-
.err()
2515-
.unwrap()
2516-
.message()
2517-
.contains("does not exist")
2518-
);
2504+
assert!(table_result
2505+
.err()
2506+
.unwrap()
2507+
.message()
2508+
.contains("does not exist"));
25192509

25202510
config_mock.assert_async().await;
25212511
update_table_mock.assert_async().await;

crates/iceberg/src/spec/snapshot_summary.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries(
339339
if summary.operation != Operation::Append
340340
&& summary.operation != Operation::Overwrite
341341
&& summary.operation != Operation::Delete
342+
&& summary.operation != Operation::Replace
342343
{
343344
return Err(Error::new(
344345
ErrorKind::DataInvalid,

crates/iceberg/src/transaction/append.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ use std::sync::Arc;
2121
use async_trait::async_trait;
2222
use uuid::Uuid;
2323

24+
use super::{
25+
MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT,
26+
MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES,
27+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
28+
};
2429
use crate::error::Result;
2530
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
2631
use crate::table::Table;
@@ -31,16 +36,6 @@ use crate::transaction::snapshot::{
3136
use crate::transaction::{ActionCommit, TransactionAction};
3237
use crate::{Error, ErrorKind};
3338

34-
/// Target size of manifest file when merging manifests.
35-
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
36-
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
37-
/// Minimum number of manifests to merge.
38-
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
39-
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
40-
/// Whether allow to merge manifests.
41-
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
42-
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
43-
4439
/// FastAppendAction is a transaction action for fast append data files to the table.
4540
pub struct FastAppendAction {
4641
check_duplicate: bool,
@@ -138,6 +133,8 @@ impl TransactionAction for FastAppendAction {
138133
self.snapshot_properties.clone(),
139134
self.added_data_files.clone(),
140135
self.added_delete_files.clone(),
136+
vec![],
137+
vec![],
141138
snapshot_id,
142139
);
143140

@@ -178,7 +175,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
178175

179176
async fn existing_manifest(
180177
&self,
181-
snapshot_produce: &SnapshotProducer<'_>,
178+
snapshot_produce: &mut SnapshotProducer<'_>,
182179
) -> Result<Vec<ManifestFile>> {
183180
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
184181
return Ok(vec![]);
@@ -219,7 +216,6 @@ pub struct MergeAppendAction {
219216
}
220217

221218
impl MergeAppendAction {
222-
#[allow(clippy::too_many_arguments)]
223219
pub(crate) fn new() -> Self {
224220
Self {
225221
target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
@@ -305,6 +301,8 @@ impl TransactionAction for MergeAppendAction {
305301
self.snapshot_properties.clone(),
306302
self.added_data_files.clone(),
307303
self.added_delete_files.clone(),
304+
vec![],
305+
vec![],
308306
snapshot_id,
309307
);
310308

crates/iceberg/src/transaction/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use std::collections::HashMap;
5757
pub use action::*;
5858
mod append;
5959
mod remove_snapshots;
60+
mod rewrite_files;
6061
mod snapshot;
6162
mod sort_order;
6263
mod update_location;
@@ -67,9 +68,9 @@ mod upgrade_format_version;
6768
use std::sync::Arc;
6869
use std::time::Duration;
6970

70-
pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
7171
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
7272
use remove_snapshots::RemoveSnapshotAction;
73+
use rewrite_files::RewriteFilesAction;
7374

7475
use crate::error::Result;
7576
use crate::spec::{
@@ -87,6 +88,19 @@ use crate::transaction::update_statistics::UpdateStatisticsAction;
8788
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
8889
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
8990

91+
/// Target size of manifest file when merging manifests.
92+
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
93+
/// This is the default value for `MANIFEST_TARGET_SIZE_BYTES`.
94+
pub const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
95+
/// Minimum number of manifests to merge.
96+
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
97+
/// This is the default value for `MANIFEST_MIN_MERGE_COUNT`.
98+
pub const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
99+
/// Whether allow to merge manifests.
100+
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
101+
/// This is the default value for `MANIFEST_MERGE_ENABLED`.
102+
pub const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
103+
90104
/// Table transaction.
91105
#[derive(Clone)]
92106
pub struct Transaction {
@@ -175,6 +189,11 @@ impl Transaction {
175189
UpdateStatisticsAction::new()
176190
}
177191

192+
/// Creates rewrite files action.
193+
pub fn rewrite_files(self) -> RewriteFilesAction {
194+
RewriteFilesAction::new()
195+
}
196+
178197
/// Commit transaction.
179198
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
180199
if self.actions.is_empty() {

0 commit comments

Comments
 (0)