Skip to content

Commit 5c1a9e6

Browse files
authored
RecordBatchTransformer: Handle schema migration and column re-ordering in table scans (#602)
* feat: Add skeleton of RecordBatchEvolutionProcessor * feat: Add initial implementation of RecordBatchEvolutionProcessor * feat: support more column types. Improve error handling. Add more comments * feat(wip): adress issues with reordered / skipped fields * feat: RecordBatchEvolutionProcessor handles skipped fields in projection * chore: add missing license header * chore: remove unneeded comment * refactor: rename to RecordBatchTransformer. Improve passthrough handling * feat: more performant handling of case where only schema transform is required but columns can remain unmodified * refactor: import arrow_cast rather than arrow
1 parent d09e32e commit 5c1a9e6

File tree

9 files changed

+671
-2
lines changed

9 files changed

+671
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ apache-avro = "0.17"
4141
array-init = "2"
4242
arrow-arith = { version = "53" }
4343
arrow-array = { version = "53" }
44+
arrow-cast = { version = "53" }
4445
arrow-ord = { version = "53" }
4546
arrow-schema = { version = "53" }
4647
arrow-select = { version = "53" }

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ apache-avro = { workspace = true }
4646
array-init = { workspace = true }
4747
arrow-arith = { workspace = true }
4848
arrow-array = { workspace = true }
49+
arrow-cast = { workspace = true }
4950
arrow-ord = { workspace = true }
5051
arrow-schema = { workspace = true }
5152
arrow-select = { workspace = true }

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@
2020
mod schema;
2121
pub use schema::*;
2222
mod reader;
23+
pub(crate) mod record_batch_transformer;
24+
2325
pub use reader::*;

crates/iceberg/src/arrow/reader.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
3838
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3939
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4040

41+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4142
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4243
use crate::error::Result;
4344
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
@@ -209,6 +210,12 @@ impl ArrowReader {
209210
)?;
210211
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
211212

213+
// RecordBatchTransformer performs any required transformations on the RecordBatches
214+
// that come back from the file, such as type promotion, default column insertion
215+
// and column re-ordering
216+
let mut record_batch_transformer =
217+
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
218+
212219
if let Some(batch_size) = batch_size {
213220
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
214221
}
@@ -261,8 +268,10 @@ impl ArrowReader {
261268
// Build the batch stream and send all the RecordBatches that it generates
262269
// to the requester.
263270
let mut record_batch_stream = record_batch_stream_builder.build()?;
271+
264272
while let Some(batch) = record_batch_stream.try_next().await? {
265-
tx.send(Ok(batch)).await?
273+
tx.send(record_batch_transformer.process_record_batch(batch))
274+
.await?
266275
}
267276

268277
Ok(())

0 commit comments

Comments
 (0)