Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 226 additions & 13 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Native Iceberg table scan operator using iceberg-rust

use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -36,14 +36,15 @@ use datafusion::physical_plan::metrics::{
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
};
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{ready, FutureExt, Stream, StreamExt, TryStreamExt};
use iceberg::io::FileIO;

use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
use futures::future::BoxFuture;
use datafusion_comet_spark_expr::EvalMode;
use crate::parquet::schema_adapter::ParquetSchemaAdapterStream;

/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
///
Expand Down Expand Up @@ -179,20 +180,27 @@ impl IcebergScanExec {
})?;

let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let adapter_factory = SparkSchemaAdapterFactory::new(spark_options, None);

let adapted_stream =
stream.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e)));

let wrapped_stream = IcebergStreamWrapper {
inner: adapted_stream,
inner: stream,
schema: output_schema,
cached_adapter: None,
adapter_factory,
baseline_metrics: metrics.baseline,
};

Ok(Box::pin(wrapped_stream))
let adapter_stream = ParquetSchemaAdapterStream::new(
wrapped_stream,
Arc::clone(&output_schema),
output_schema.as_ref(),
spark_options,
None,
);

let adapted_stream =
adapter_stream.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e)));

Ok(Box::pin(adapted_stream))
}

fn load_file_io(
Expand Down Expand Up @@ -229,10 +237,215 @@ impl IcebergScanMetrics {
}
}

/// Wrapper around iceberg-rust's stream that performs schema adaptation.
/// Handles batches from multiple files that may have different Arrow schemas
/// (metadata, field IDs, etc.). Caches schema adapters by source schema to avoid
/// recreating them for every batch from the same file.
/// State machine for IcebergFileStream
enum FileStreamState {
/// Idle state - need to start opening next file
Idle,
/// Opening a file
Opening {
future: BoxFuture<'static, DFResult<SendableRecordBatchStream>>,
},
/// Reading from current file while potentially opening next file
Reading {
current: SendableRecordBatchStream,
next: Option<BoxFuture<'static, DFResult<SendableRecordBatchStream>>>,
},
/// Error state
Error,
}

/// Stream that reads Iceberg files with parallel opening optimization.
/// Opens the next file while reading the current file to overlap IO with compute.
///
/// Inspired by DataFusion's [`FileStream`] pattern for overlapping file opening with reading.
///
/// [`FileStream`]: https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs
struct IcebergFileStream {
schema: SchemaRef,
file_io: FileIO,
batch_size: usize,
tasks: VecDeque<iceberg::scan::FileScanTask>,
state: FileStreamState,
metrics: IcebergScanMetrics,
}

impl IcebergFileStream {
fn new(
tasks: Vec<iceberg::scan::FileScanTask>,
file_io: FileIO,
batch_size: usize,
schema: SchemaRef,
metrics: IcebergScanMetrics,
) -> DFResult<Self> {
Ok(Self {
schema,
file_io,
batch_size,
tasks: tasks.into_iter().collect(),
state: FileStreamState::Idle,
metrics,
})
}

fn start_next_file(
&mut self,
) -> Option<BoxFuture<'static, DFResult<SendableRecordBatchStream>>> {
let task = self.tasks.pop_front()?;

self.metrics.num_splits.add(1);

let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let schema = Arc::clone(&self.schema);

Some(Box::pin(async move {
let task_stream = futures::stream::iter(vec![Ok(task)]).boxed();

let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
.with_batch_size(batch_size)
.with_row_selection_enabled(true)
.build();

let stream = reader.read(task_stream).map_err(|e| {
DataFusionError::Execution(format!("Failed to read Iceberg task: {}", e))
})?;

// Convert iceberg stream to SendableRecordBatchStream
let iceberg_stream = stream
.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e)));

// Wrap the iceberg stream to make it a RecordBatchStream
let wrapped_stream: SendableRecordBatchStream = Box::pin(IcebergStreamWrapper {
inner: iceberg_stream,
schema: Arc::clone(&schema),
});

// Apply schema adaptation using ParquetSchemaAdapterStream
let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let file_schema = wrapped_stream.schema();
let adapter_stream = ParquetSchemaAdapterStream::new(
wrapped_stream,
Arc::clone(&schema),
file_schema.as_ref(),
spark_options,
None,
);

Ok(Box::pin(adapter_stream) as SendableRecordBatchStream)
}))
}

fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<DFResult<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.metrics.file_stream.time_opening.start();
match self.start_next_file() {
Some(future) => {
self.state = FileStreamState::Opening { future };
}
None => return Poll::Ready(None),
}
}
FileStreamState::Opening { future } => match ready!(future.poll_unpin(cx)) {
Ok(stream) => {
self.metrics.file_stream.time_opening.stop();
self.metrics.file_stream.time_scanning_until_data.start();
self.metrics.file_stream.time_scanning_total.start();
let next = self.start_next_file();
self.state = FileStreamState::Reading {
current: stream,
next,
};
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Reading { current, next } => {
// Poll next file opening future to drive it forward (background IO)
if let Some(next_future) = next {
if let Poll::Ready(result) = next_future.poll_unpin(cx) {
match result {
Ok(stream) => {
*next = Some(Box::pin(futures::future::ready(Ok(stream))));
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
}

match ready!(current.poll_next_unpin(cx)) {
Some(result) => {
// Stop time_scanning_until_data on first batch (idempotent)
self.metrics.file_stream.time_scanning_until_data.stop();
self.metrics.file_stream.time_scanning_total.stop();
// Restart time_scanning_total for next batch
self.metrics.file_stream.time_scanning_total.start();
return Poll::Ready(Some(result));
}
None => {
self.metrics.file_stream.time_scanning_until_data.stop();
self.metrics.file_stream.time_scanning_total.stop();
match next.take() {
Some(mut next_future) => match next_future.poll_unpin(cx) {
Poll::Ready(Ok(stream)) => {
self.metrics.file_stream.time_scanning_until_data.start();
self.metrics.file_stream.time_scanning_total.start();
let next_next = self.start_next_file();
self.state = FileStreamState::Reading {
current: stream,
next: next_next,
};
}
Poll::Ready(Err(e)) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = FileStreamState::Opening {
future: next_future,
};
}
},
None => {
return Poll::Ready(None);
}
}
}
}
}
FileStreamState::Error => {
return Poll::Ready(None);
}
}
}
}
}

impl Stream for IcebergFileStream {
type Item = DFResult<arrow::array::RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.metrics.file_stream.time_processing.start();
let result = self.poll_inner(cx);
self.metrics.file_stream.time_processing.stop();
self.metrics.baseline.record_poll(result)
}
}

impl RecordBatchStream for IcebergFileStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

/// Wrapper around iceberg-rust's stream that avoids strict schema checks.
/// Returns the expected output schema to prevent rejection of batches with metadata differences.
struct IcebergStreamWrapper<S> {
inner: S,
schema: SchemaRef,
Expand Down
Loading
Loading