Skip to content

Commit 67918fb

Browse files
committed
- add parse_property to refine parse property logic
- rename FastAppendOperation to AppendOperation - use iterable input for pack to be more memory efficient
1 parent 6042e41 commit 67918fb

File tree

3 files changed

+57
-34
lines changed

3 files changed

+57
-34
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,14 @@ impl TransactionAction for FastAppendAction {
103103
}
104104

105105
snapshot_producer
106-
.commit(FastAppendOperation, DefaultManifestProcess)
106+
.commit(AppendOperation, DefaultManifestProcess)
107107
.await
108108
}
109109
}
110110

111-
pub(crate) struct FastAppendOperation;
111+
pub(crate) struct AppendOperation;
112112

113-
impl SnapshotProduceOperation for FastAppendOperation {
113+
impl SnapshotProduceOperation for AppendOperation {
114114
fn operation(&self) -> Operation {
115115
Operation::Append
116116
}

crates/iceberg/src/transaction/merge_append.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ use crate::Result;
2626
use crate::io::FileIO;
2727
use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter};
2828
use crate::table::Table;
29-
use crate::transaction::append::FastAppendOperation;
29+
use crate::transaction::append::AppendOperation;
3030
use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer};
3131
use crate::transaction::{ActionCommit, TransactionAction};
3232
use crate::utils::bin::ListPacker;
33+
use crate::utils::parse_property;
3334

3435
/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
3536
/// based on the target size.
@@ -57,24 +58,21 @@ const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
5758

5859
impl MergeAppendAction {
5960
pub(crate) fn new(table: &Table) -> Result<Self> {
60-
let target_size_bytes: u32 = table
61-
.metadata()
62-
.properties()
63-
.get(MANIFEST_TARGET_SIZE_BYTES)
64-
.and_then(|s| s.parse().ok())
65-
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
66-
let min_count_to_merge: u32 = table
67-
.metadata()
68-
.properties()
69-
.get(MANIFEST_MIN_MERGE_COUNT)
70-
.and_then(|s| s.parse().ok())
71-
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
72-
let merge_enabled = table
73-
.metadata()
74-
.properties()
75-
.get(MANIFEST_MERGE_ENABLED)
76-
.and_then(|s| s.parse().ok())
77-
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
61+
let target_size_bytes: u32 = parse_property(
62+
table.metadata().properties(),
63+
MANIFEST_TARGET_SIZE_BYTES,
64+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
65+
)?;
66+
let min_count_to_merge: u32 = parse_property(
67+
table.metadata().properties(),
68+
MANIFEST_MIN_MERGE_COUNT,
69+
MANIFEST_MIN_MERGE_COUNT_DEFAULT,
70+
)?;
71+
let merge_enabled = parse_property(
72+
table.metadata().properties(),
73+
MANIFEST_MERGE_ENABLED,
74+
MANIFEST_MERGE_ENABLED_DEFAULT,
75+
)?;
7876
Ok(Self {
7977
check_duplicate: true,
8078
target_size_bytes,
@@ -140,14 +138,14 @@ impl TransactionAction for MergeAppendAction {
140138

141139
if self.merge_enabled {
142140
snapshot_producer
143-
.commit(FastAppendOperation, MergeManifsetProcess {
141+
.commit(AppendOperation, MergeManifsetProcess {
144142
target_size_bytes: self.target_size_bytes,
145143
min_count_to_merge: self.min_count_to_merge,
146144
})
147145
.await
148146
} else {
149147
snapshot_producer
150-
.commit(FastAppendOperation, DefaultManifestProcess)
148+
.commit(AppendOperation, DefaultManifestProcess)
151149
.await
152150
}
153151
}
@@ -228,7 +226,6 @@ impl MergeManifestManager {
228226
packer.pack(group_manifests, |manifest| manifest.manifest_length as u32);
229227

230228
let manifest_merge_futures = manifest_bins
231-
.into_iter()
232229
.map(|manifest_bin| {
233230
if manifest_bin.len() == 1 {
234231
Ok(Box::pin(async { Ok(manifest_bin) })

crates/iceberg/src/utils.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
1819
use std::num::NonZeroUsize;
20+
use std::str::FromStr;
21+
22+
use crate::{Error, ErrorKind, Result};
1923

2024
// Use a default value of 1 as the safest option.
2125
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
@@ -41,12 +45,31 @@ pub(crate) fn available_parallelism() -> NonZeroUsize {
4145
})
4246
}
4347

48+
pub(crate) fn parse_property<T>(
49+
properties: &HashMap<String, String>,
50+
key: &str,
51+
default_value: T,
52+
) -> Result<T>
53+
where
54+
T: FromStr,
55+
T::Err: std::error::Error + Send + Sync + 'static,
56+
{
57+
match properties.get(key) {
58+
Some(s) => s.parse::<T>().map_err(|e| {
59+
Error::new(
60+
ErrorKind::DataInvalid,
61+
format!("Invalid property value for key: {}", key),
62+
)
63+
.with_source(e)
64+
}),
65+
None => Ok(default_value),
66+
}
67+
}
68+
4469
pub mod bin {
4570
use std::iter::Iterator;
4671
use std::marker::PhantomData;
4772

48-
use itertools::Itertools;
49-
5073
struct Bin<T> {
5174
bin_weight: u32,
5275
target_weight: u32,
@@ -91,8 +114,11 @@ pub mod bin {
91114
}
92115
}
93116

94-
pub fn pack<F>(&self, items: Vec<T>, weight_func: F) -> Vec<Vec<T>>
95-
where F: Fn(&T) -> u32 {
117+
pub fn pack<I, F>(&self, items: I, weight_func: F) -> impl Iterator<Item = Vec<T>>
118+
where
119+
I: IntoIterator<Item = T>,
120+
F: Fn(&T) -> u32,
121+
{
96122
let mut bins: Vec<Bin<T>> = vec![];
97123
for item in items {
98124
let cur_weight = weight_func(&item);
@@ -106,7 +132,7 @@ pub mod bin {
106132
addable_bin.add(item, cur_weight);
107133
}
108134

109-
bins.into_iter().map(|bin| bin.into_vec()).collect_vec()
135+
bins.into_iter().map(|bin| bin.into_vec())
110136
}
111137
}
112138

@@ -119,7 +145,7 @@ pub mod bin {
119145
let packer = ListPacker::new(10);
120146
let items = vec![3, 4, 5, 6, 2, 1];
121147

122-
let packed = packer.pack(items, |&x| x);
148+
let packed: Vec<Vec<u32>> = packer.pack(items, |&x| x).collect();
123149

124150
assert_eq!(packed.len(), 3);
125151
assert!(packed[0].iter().sum::<u32>() == 10);
@@ -155,7 +181,7 @@ pub mod bin {
155181
},
156182
];
157183

158-
let packed = packer.pack(items, |item| item.size);
184+
let packed: Vec<Vec<Item>> = packer.pack(items, |item| item.size).collect();
159185

160186
assert_eq!(packed.len(), 2);
161187
assert!(packed[0].iter().map(|x| x.size).sum::<u32>() <= 15);
@@ -167,7 +193,7 @@ pub mod bin {
167193
let packer = ListPacker::new(10);
168194
let items = vec![15, 5, 3];
169195

170-
let packed = packer.pack(items, |&x| x);
196+
let packed: Vec<Vec<u32>> = packer.pack(items, |&x| x).collect();
171197

172198
assert_eq!(packed.len(), 2);
173199
assert!(packed[0].contains(&15));
@@ -179,7 +205,7 @@ pub mod bin {
179205
let packer = ListPacker::new(10);
180206
let items: Vec<u32> = vec![];
181207

182-
let packed = packer.pack(items, |&x| x);
208+
let packed: Vec<Vec<u32>> = packer.pack(items, |&x| x).collect();
183209

184210
assert_eq!(packed.len(), 0);
185211
}

0 commit comments

Comments
 (0)