Skip to content

Commit bf6d2ef

Browse files
committed
feat: RecordBatchEvolutionProcessor handles skipped fields in projection
1 parent 6d8ad87 commit bf6d2ef

File tree

3 files changed

+274
-198
lines changed

3 files changed

+274
-198
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,8 @@ impl ArrowReader {
197197

198198
// create a RecordBatchEvolutionProcessor if our task schema contains columns
199199
// not present in the parquet file or whose types have been promoted
200-
let record_batch_evolution_processor = RecordBatchEvolutionProcessor::build(
201-
record_batch_stream_builder.schema(),
202-
task.schema(),
203-
task.project_field_ids(),
204-
)?;
200+
let mut record_batch_evolution_processor =
201+
RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids());
205202

206203
if let Some(batch_size) = batch_size {
207204
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
@@ -243,15 +240,9 @@ impl ArrowReader {
243240
// to the requester.
244241
let mut record_batch_stream = record_batch_stream_builder.build()?;
245242

246-
if let Some(record_batch_evolution_processor) = record_batch_evolution_processor {
247-
while let Some(batch) = record_batch_stream.try_next().await? {
248-
tx.send(record_batch_evolution_processor.process_record_batch(batch))
249-
.await?
250-
}
251-
} else {
252-
while let Some(batch) = record_batch_stream.try_next().await? {
253-
tx.send(Ok(batch)).await?
254-
}
243+
while let Some(batch) = record_batch_stream.try_next().await? {
244+
tx.send(record_batch_evolution_processor.process_record_batch(batch))
245+
.await?
255246
}
256247

257248
Ok(())

0 commit comments

Comments
 (0)