Skip to content

Commit 0ed5721

Browse files
committed
feat: more performant handling of case where only schema transform is required but columns can remain unmodified
1 parent 0b17465 commit 0ed5721

File tree

2 files changed

+91
-13
lines changed

2 files changed

+91
-13
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 90 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,21 @@ enum BatchTransform {
9090
// Indicates how each column in the target schema is derived.
9191
operations: Vec<ColumnSource>,
9292
},
93+
94+
// Sometimes only the schema will need modifying, for example when
95+
// the column names have changed vs the file, but not the column types.
96+
// we can avoid a heap allocation per RecordBach in this case by retaining
97+
// the existing column Vec.
98+
ModifySchema {
99+
target_schema: Arc<ArrowSchema>,
100+
},
101+
}
102+
103+
#[derive(Debug)]
104+
enum SchemaComparison {
105+
Equivalent,
106+
NameChangesOnly,
107+
Different,
93108
}
94109

95110
#[derive(Debug)]
@@ -134,7 +149,7 @@ impl RecordBatchTransformer {
134149
&mut self,
135150
record_batch: RecordBatch,
136151
) -> Result<RecordBatch> {
137-
Ok(match self.batch_transform {
152+
Ok(match &self.batch_transform {
138153
Some(BatchTransform::PassThrough) => record_batch,
139154
Some(BatchTransform::Modify {
140155
ref target_schema,
@@ -143,6 +158,9 @@ impl RecordBatchTransformer {
143158
target_schema.clone(),
144159
self.transform_columns(record_batch.columns(), operations)?,
145160
)?,
161+
Some(BatchTransform::ModifySchema { target_schema }) => {
162+
record_batch.with_schema(target_schema.clone())?
163+
}
146164
None => {
147165
self.batch_transform = Some(Self::generate_batch_transform(
148166
record_batch.schema_ref(),
@@ -168,8 +186,6 @@ impl RecordBatchTransformer {
168186
projected_iceberg_field_ids: &[i32],
169187
) -> Result<BatchTransform> {
170188
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
171-
let field_id_to_source_schema_map =
172-
Self::build_field_id_to_arrow_schema_map(source_schema)?;
173189
let field_id_to_mapped_schema_map =
174190
Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?;
175191

@@ -186,12 +202,78 @@ impl RecordBatchTransformer {
186202
})
187203
.collect();
188204

189-
let target_schema = ArrowSchema::new(fields?);
190-
if target_schema == *source_schema.as_ref() {
191-
return Ok(BatchTransform::PassThrough);
205+
let target_schema = Arc::new(ArrowSchema::new(fields?));
206+
207+
match Self::compare_schemas(source_schema, &target_schema) {
208+
SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough),
209+
SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }),
210+
SchemaComparison::Different => Ok(BatchTransform::Modify {
211+
operations: Self::generate_transform_operations(
212+
source_schema,
213+
snapshot_schema,
214+
projected_iceberg_field_ids,
215+
field_id_to_mapped_schema_map,
216+
)?,
217+
target_schema,
218+
}),
219+
}
220+
}
221+
222+
/// Compares the source and target schemas
223+
/// Determines if they have changed in any meaningful way:
224+
/// * If they have different numbers of fields, then we need to modify
225+
/// the incoming RecordBatch schema AND columns
226+
/// * If they have the same number of fields, but some of them differ in
227+
/// either data type or nullability, then we need to modify the
228+
/// incoming RecordBatch schema AND columns
229+
/// * If the schemas differ only in the column names, then we need
230+
/// to modify the RecordBatch schema BUT we can keep the
231+
/// original column data unmodified
232+
/// * If the schemas are identical (or differ only in inconsequential
233+
/// ways) then we can pass through the original RecordBatch unmodified
234+
fn compare_schemas(
235+
source_schema: &ArrowSchemaRef,
236+
target_schema: &ArrowSchemaRef,
237+
) -> SchemaComparison {
238+
if source_schema.fields().len() != target_schema.fields().len() {
239+
return SchemaComparison::Different;
240+
}
241+
242+
let mut names_changed = false;
243+
244+
for (source_field, target_field) in source_schema
245+
.fields()
246+
.iter()
247+
.zip(target_schema.fields().iter())
248+
{
249+
if source_field.data_type() != target_field.data_type()
250+
|| source_field.is_nullable() != target_field.is_nullable()
251+
{
252+
return SchemaComparison::Different;
253+
}
254+
255+
if source_field.name() != target_field.name() {
256+
names_changed = true;
257+
}
192258
}
193259

194-
let operations: Result<Vec<_>> = projected_iceberg_field_ids.iter().map(|field_id|{
260+
if names_changed {
261+
SchemaComparison::NameChangesOnly
262+
} else {
263+
SchemaComparison::Equivalent
264+
}
265+
}
266+
267+
fn generate_transform_operations(
268+
source_schema: &ArrowSchemaRef,
269+
snapshot_schema: &IcebergSchema,
270+
projected_iceberg_field_ids: &[i32],
271+
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
272+
) -> Result<Vec<ColumnSource>> {
273+
let field_id_to_source_schema_map =
274+
Self::build_field_id_to_arrow_schema_map(source_schema)?;
275+
276+
projected_iceberg_field_ids.iter().map(|field_id|{
195277
let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
196278
Error::new(ErrorKind::Unexpected, "could not find field in schema")
197279
)?;
@@ -237,12 +319,7 @@ impl RecordBatchTransformer {
237319
target_type: target_type.clone(),
238320
}
239321
})
240-
}).collect();
241-
242-
Ok(BatchTransform::Modify {
243-
operations: operations?,
244-
target_schema: Arc::new(target_schema),
245-
})
322+
}).collect()
246323
}
247324

248325
fn build_field_id_to_arrow_schema_map(

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
#[macro_use]
5757
extern crate derive_builder;
58+
extern crate core;
5859

5960
mod error;
6061
pub use error::{Error, ErrorKind, Result};

0 commit comments

Comments
 (0)