Skip to content

Commit 53a1267

Browse files
committed
refactor: rename to RecordBatchTransformer. Improve passthrough handling
1 parent 36a00fc commit 53a1267

File tree

3 files changed

+98
-108
lines changed

3 files changed

+98
-108
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020
mod schema;
2121
pub use schema::*;
2222
mod reader;
23-
pub(crate) mod record_batch_evolution_processor;
23+
pub(crate) mod record_batch_transformer;
2424

2525
pub use reader::*;

crates/iceberg/src/arrow/reader.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
3838
use parquet::file::metadata::ParquetMetaData;
3939
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4040

41-
use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor;
41+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4242
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4343
use crate::error::Result;
4444
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
@@ -195,10 +195,11 @@ impl ArrowReader {
195195
)?;
196196
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
197197

198-
// create a RecordBatchEvolutionProcessor if our task schema contains columns
199-
// not present in the parquet file or whose types have been promoted
200-
let mut record_batch_evolution_processor =
201-
RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids());
198+
// RecordBatchTransformer performs any required transformations on the RecordBatches
199+
// that come back from the file, such as type promotion, default column insertion
200+
// and column re-ordering
201+
let mut record_batch_transformer =
202+
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
202203

203204
if let Some(batch_size) = batch_size {
204205
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
@@ -241,7 +242,7 @@ impl ArrowReader {
241242
let mut record_batch_stream = record_batch_stream_builder.build()?;
242243

243244
while let Some(batch) = record_batch_stream.try_next().await? {
244-
tx.send(record_batch_evolution_processor.process_record_batch(batch))
245+
tx.send(record_batch_transformer.process_record_batch(batch))
245246
.await?
246247
}
247248

crates/iceberg/src/arrow/record_batch_evolution_processor.rs renamed to crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 90 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,32 @@ use crate::arrow::schema_to_arrow_schema;
3232
use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
3333
use crate::{Error, ErrorKind, Result};
3434

35-
/// Represents an operation that needs to be performed
36-
/// to transform a RecordBatch coming from a Parquet file record
37-
/// batch stream so that it conforms to an Iceberg schema that has
38-
/// evolved from the one that was used when the file was written.
35+
/// Indicates how a particular column in a processed RecordBatch should
36+
/// be sourced.
3937
#[derive(Debug)]
40-
pub(crate) enum EvolutionAction {
38+
pub(crate) enum ColumnSource {
4139
// signifies that a column should be passed through unmodified
40+
// from the file's RecordBatch
4241
PassThrough {
4342
source_index: usize,
4443
},
4544

46-
// signifies particular column has undergone type promotion, and so
47-
// the source column with the given index needs to be promoted to the
48-
// specified type
45+
// signifies that a column from the file's RecordBatch has undergone
46+
// type promotion so the source column with the given index needs
47+
// to be promoted to the specified type
4948
Promote {
5049
target_type: DataType,
5150
source_index: usize,
5251
},
5352

54-
// Signifies that a new column has been inserted before the row
53+
// Signifies that a new column has been inserted before the column
5554
// with index `index`. (we choose "before" rather than "after" so
5655
// that we can use usize; if we insert after, then we need to
57-
// be able to store -1 here when we want to indicate that the new
58-
// column is to be added at the front of the list).
56+
// be able to store -1 here to signify that a new
57+
// column is to be added at the front of the column list).
5958
// If multiple columns need to be inserted at a given
6059
// location, they should all be given the same index, as the index
61-
// here refers to the original record batch, not the interim state after
60+
// here refers to the original RecordBatch, not the interim state after
6261
// a preceding operation.
6362
Add {
6463
target_type: DataType,
@@ -67,43 +66,53 @@ pub(crate) enum EvolutionAction {
6766
// The iceberg spec refers to other permissible schema evolution actions
6867
// (see https://iceberg.apache.org/spec/#schema-evolution):
6968
// renaming fields, deleting fields and reordering fields.
70-
// Renames only affect the RecordBatch schema rather than the
69+
// Renames only affect the schema of the RecordBatch rather than the
7170
// columns themselves, so a single updated cached schema can
7271
// be re-used and no per-column actions are required.
7372
// Deletion and Reorder can be achieved without needing this
7473
// post-processing step by using the projection mask.
7574
}
7675

7776
#[derive(Debug)]
78-
struct SchemaAndOps {
79-
// Every transformed RecordBatch will have the same schema. We create the
80-
// target just once and cache it here. Helpfully, Arc<Schema> is needed in
81-
// the constructor for RecordBatch, so we don't need an expensive copy
82-
// each time.
83-
pub target_schema: Arc<ArrowSchema>,
84-
85-
// Indicates how each column in the target schema is derived.
86-
pub operations: Vec<EvolutionAction>,
77+
enum BatchTransform {
78+
// Indicates that no changes need to be performed to the RecordBatches
79+
// coming in from the stream and that they can be passed through
80+
// unmodified
81+
PassThrough,
82+
83+
Modify {
84+
// Every transformed RecordBatch will have the same schema. We create the
85+
// target just once and cache it here. Helpfully, Arc<Schema> is needed in
86+
// the constructor for RecordBatch, so we don't need an expensive copy
87+
// each time we build a new RecordBatch
88+
target_schema: Arc<ArrowSchema>,
89+
90+
// Indicates how each column in the target schema is derived.
91+
operations: Vec<ColumnSource>,
92+
},
8793
}
8894

8995
#[derive(Debug)]
90-
pub(crate) struct RecordBatchEvolutionProcessor {
96+
pub(crate) struct RecordBatchTransformer {
9197
snapshot_schema: Arc<IcebergSchema>,
9298
projected_iceberg_field_ids: Vec<i32>,
93-
schema_and_ops: Option<SchemaAndOps>,
99+
100+
// BatchTransform gets lazily constructed based on the schema of
101+
// the first RecordBatch we receive from the file
102+
batch_transform: Option<BatchTransform>,
94103
}
95104

96-
impl RecordBatchEvolutionProcessor {
97-
/// Fallibly try to build a RecordBatchEvolutionProcessor for a given parquet file schema
98-
/// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required
99-
/// due to the file schema already matching the snapshot schema
105+
impl RecordBatchTransformer {
106+
/// Build a RecordBatchTransformer for a given
107+
/// Iceberg snapshot schema and list of projected field ids.
100108
pub(crate) fn build(
101-
// source_schema: &ArrowSchemaRef,
102109
snapshot_schema: Arc<IcebergSchema>,
103110
projected_iceberg_field_ids: &[i32],
104111
) -> Self {
105112
let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() {
106-
// project all fields in table schema order
113+
// If the list of field ids is empty, this indicates that we
114+
// need to select all fields.
115+
// Project all fields in table schema order
107116
snapshot_schema
108117
.as_struct()
109118
.fields()
@@ -117,61 +126,47 @@ impl RecordBatchEvolutionProcessor {
117126
Self {
118127
snapshot_schema,
119128
projected_iceberg_field_ids,
120-
schema_and_ops: None,
129+
batch_transform: None,
121130
}
122-
123-
// let (operations, target_schema) = Self::generate_operations_and_schema(
124-
// source_schema,
125-
// snapshot_schema,
126-
// projected_iceberg_field_ids,
127-
// )?;
128-
//
129-
// Ok(if target_schema.as_ref() == source_schema.as_ref() {
130-
// None
131-
// } else {
132-
// Some(Self {
133-
// operations,
134-
// target_schema,
135-
// })
136-
// })
137131
}
138132

139133
pub(crate) fn process_record_batch(
140134
&mut self,
141135
record_batch: RecordBatch,
142136
) -> Result<RecordBatch> {
143-
if self.schema_and_ops.is_none() {
144-
self.schema_and_ops = Some(Self::generate_operations_and_schema(
145-
record_batch.schema_ref(),
146-
self.snapshot_schema.as_ref(),
147-
&self.projected_iceberg_field_ids,
148-
)?);
149-
}
150-
151-
let Some(SchemaAndOps {
152-
ref target_schema, ..
153-
}) = self.schema_and_ops
154-
else {
155-
return Err(Error::new(
156-
ErrorKind::Unexpected,
157-
"schema_and_ops always created at this point",
158-
));
159-
};
160-
161-
Ok(RecordBatch::try_new(
162-
target_schema.clone(),
163-
self.transform_columns(record_batch.columns())?,
164-
)?)
137+
Ok(match self.batch_transform {
138+
Some(BatchTransform::PassThrough) => record_batch,
139+
Some(BatchTransform::Modify {
140+
ref target_schema,
141+
ref operations,
142+
}) => RecordBatch::try_new(
143+
target_schema.clone(),
144+
self.transform_columns(record_batch.columns(), operations)?,
145+
)?,
146+
None => {
147+
self.batch_transform = Some(Self::generate_batch_transform(
148+
record_batch.schema_ref(),
149+
self.snapshot_schema.as_ref(),
150+
&self.projected_iceberg_field_ids,
151+
)?);
152+
153+
self.process_record_batch(record_batch)?
154+
}
155+
})
165156
}
166157

167-
// create the (possibly empty) list of `EvolutionOp`s that we need
168-
// to apply to the arrays in a record batch with `source_schema` so
169-
// that it matches the `snapshot_schema`
170-
fn generate_operations_and_schema(
158+
// Compare the schema of the incoming RecordBatches to the schema of
159+
// the Iceberg snapshot to determine what, if any, transformation
160+
// needs to be applied. If the schemas match, we return BatchTransform::PassThrough
161+
// to indicate that no changes need to be made. Otherwise, we return a
162+
// BatchTransform::Modify containing the target RecordBatch schema and
163+
// the list of `ColumnSource`s that indicate how to source each column in
164+
// the resulting RecordBatches.
165+
fn generate_batch_transform(
171166
source_schema: &ArrowSchemaRef,
172167
snapshot_schema: &IcebergSchema,
173168
projected_iceberg_field_ids: &[i32],
174-
) -> Result<SchemaAndOps> {
169+
) -> Result<BatchTransform> {
175170
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
176171
let field_id_to_source_schema_map =
177172
Self::build_field_id_to_arrow_schema_map(source_schema)?;
@@ -190,7 +185,11 @@ impl RecordBatchEvolutionProcessor {
190185
.clone())
191186
})
192187
.collect();
188+
193189
let target_schema = ArrowSchema::new(fields?);
190+
if target_schema == *source_schema.as_ref() {
191+
return Ok(BatchTransform::PassThrough);
192+
}
194193

195194
let operations: Result<Vec<_>> = projected_iceberg_field_ids.iter().map(|field_id|{
196195
let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
@@ -203,12 +202,12 @@ impl RecordBatchEvolutionProcessor {
203202

204203
if source_field.data_type().equals_datatype(target_type) {
205204
// no promotion required
206-
EvolutionAction::PassThrough {
205+
ColumnSource::PassThrough {
207206
source_index: *source_index
208207
}
209208
} else {
210209
// promotion required
211-
EvolutionAction::Promote {
210+
ColumnSource::Promote {
212211
target_type: target_type.clone(),
213212
source_index: *source_index,
214213
}
@@ -222,25 +221,25 @@ impl RecordBatchEvolutionProcessor {
222221
let default_value = if let Some(ref iceberg_default_value) =
223222
&iceberg_field.initial_default
224223
{
225-
let Literal::Primitive(prim_value) = iceberg_default_value else {
224+
let Literal::Primitive(primitive_literal) = iceberg_default_value else {
226225
return Err(Error::new(
227226
ErrorKind::Unexpected,
228227
format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value)
229228
));
230229
};
231-
Some(prim_value.clone())
230+
Some(primitive_literal.clone())
232231
} else {
233232
None
234233
};
235234

236-
EvolutionAction::Add {
235+
ColumnSource::Add {
237236
value: default_value,
238237
target_type: target_type.clone(),
239238
}
240239
})
241240
}).collect();
242241

243-
Ok(SchemaAndOps {
242+
Ok(BatchTransform::Modify {
244243
operations: operations?,
245244
target_schema: Arc::new(target_schema),
246245
})
@@ -278,37 +277,30 @@ impl RecordBatchEvolutionProcessor {
278277
fn transform_columns(
279278
&self,
280279
columns: &[Arc<dyn ArrowArray>],
280+
operations: &[ColumnSource],
281281
) -> Result<Vec<Arc<dyn ArrowArray>>> {
282282
if columns.is_empty() {
283283
return Ok(columns.to_vec());
284284
}
285285
let num_rows = columns[0].len();
286286

287-
let Some(ref schema_and_ops) = self.schema_and_ops else {
288-
return Err(Error::new(
289-
ErrorKind::Unexpected,
290-
"schema_and_ops was None, but should be present",
291-
));
292-
};
293-
294-
let result: Result<Vec<_>> = schema_and_ops
295-
.operations
287+
operations
296288
.iter()
297289
.map(|op| {
298290
Ok(match op {
299-
EvolutionAction::PassThrough { source_index } => columns[*source_index].clone(),
300-
EvolutionAction::Promote {
291+
ColumnSource::PassThrough { source_index } => columns[*source_index].clone(),
292+
293+
ColumnSource::Promote {
301294
target_type,
302295
source_index,
303296
} => cast(&*columns[*source_index], target_type)?,
304-
EvolutionAction::Add { target_type, value } => {
297+
298+
ColumnSource::Add { target_type, value } => {
305299
Self::create_column(target_type, value, num_rows)?
306300
}
307301
})
308302
})
309-
.collect();
310-
311-
result
303+
.collect()
312304
}
313305

314306
fn create_column(
@@ -388,16 +380,15 @@ mod test {
388380
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
389381
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
390382

391-
use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor;
383+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
392384
use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type};
393385

394386
#[test]
395387
fn build_field_id_to_source_schema_map_works() {
396388
let arrow_schema = arrow_schema_already_same_as_target();
397389

398390
let result =
399-
RecordBatchEvolutionProcessor::build_field_id_to_arrow_schema_map(&arrow_schema)
400-
.unwrap();
391+
RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap();
401392

402393
let expected = HashMap::from_iter([
403394
(10, (arrow_schema.fields()[0].clone(), 0)),
@@ -415,8 +406,7 @@ mod test {
415406
let snapshot_schema = Arc::new(iceberg_table_schema());
416407
let projected_iceberg_field_ids = [13, 14];
417408

418-
let mut inst =
419-
RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids);
409+
let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);
420410

421411
let result = inst
422412
.process_record_batch(source_record_batch_no_migration_required())
@@ -432,8 +422,7 @@ mod test {
432422
let snapshot_schema = Arc::new(iceberg_table_schema());
433423
let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f
434424

435-
let mut inst =
436-
RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids);
425+
let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);
437426

438427
let result = inst.process_record_batch(source_record_batch()).unwrap();
439428

0 commit comments

Comments
 (0)