diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 2f639e9f70..0b1b97d732 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -18,7 +18,7 @@ //! Native Iceberg table scan operator using iceberg-rust use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::pin::Pin; use std::sync::Arc; @@ -36,14 +36,15 @@ use datafusion::physical_plan::metrics::{ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, }; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{ready, FutureExt, Stream, StreamExt, TryStreamExt}; use iceberg::io::FileIO; use crate::execution::operators::ExecutionError; use crate::parquet::parquet_support::SparkParquetOptions; -use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; +use futures::future::BoxFuture; use datafusion_comet_spark_expr::EvalMode; +use crate::parquet::schema_adapter::ParquetSchemaAdapterStream; /// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables. /// @@ -179,20 +180,27 @@ impl IcebergScanExec { })?; let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); - let adapter_factory = SparkSchemaAdapterFactory::new(spark_options, None); - - let adapted_stream = - stream.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e))); let wrapped_stream = IcebergStreamWrapper { - inner: adapted_stream, + inner: stream, schema: output_schema, cached_adapter: None, adapter_factory, baseline_metrics: metrics.baseline, }; - Ok(Box::pin(wrapped_stream)) + let adapter_stream = ParquetSchemaAdapterStream::new( + wrapped_stream, + Arc::clone(&output_schema), + output_schema.as_ref(), + spark_options, + None, + ); + + let adapted_stream = + adapter_stream.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e))); + + Ok(Box::pin(adapted_stream)) } fn load_file_io( @@ -229,10 +237,215 @@ impl IcebergScanMetrics { } } -/// Wrapper around iceberg-rust's stream that performs schema adaptation. -/// Handles batches from multiple files that may have different Arrow schemas -/// (metadata, field IDs, etc.). Caches schema adapters by source schema to avoid -/// recreating them for every batch from the same file. +/// State machine for IcebergFileStream +enum FileStreamState { + /// Idle state - need to start opening next file + Idle, + /// Opening a file + Opening { + future: BoxFuture<'static, DFResult>, + }, + /// Reading from current file while potentially opening next file + Reading { + current: SendableRecordBatchStream, + next: Option>>, + }, + /// Error state + Error, +} + +/// Stream that reads Iceberg files with parallel opening optimization. +/// Opens the next file while reading the current file to overlap IO with compute. +/// +/// Inspired by DataFusion's [`FileStream`] pattern for overlapping file opening with reading. +/// +/// [`FileStream`]: https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs +struct IcebergFileStream { + schema: SchemaRef, + file_io: FileIO, + batch_size: usize, + tasks: VecDeque, + state: FileStreamState, + metrics: IcebergScanMetrics, +} + +impl IcebergFileStream { + fn new( + tasks: Vec, + file_io: FileIO, + batch_size: usize, + schema: SchemaRef, + metrics: IcebergScanMetrics, + ) -> DFResult { + Ok(Self { + schema, + file_io, + batch_size, + tasks: tasks.into_iter().collect(), + state: FileStreamState::Idle, + metrics, + }) + } + + fn start_next_file( + &mut self, + ) -> Option>> { + let task = self.tasks.pop_front()?; + + self.metrics.num_splits.add(1); + + let file_io = self.file_io.clone(); + let batch_size = self.batch_size; + let schema = Arc::clone(&self.schema); + + Some(Box::pin(async move { + let task_stream = futures::stream::iter(vec![Ok(task)]).boxed(); + + let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io) + .with_batch_size(batch_size) + .with_row_selection_enabled(true) + .build(); + + let stream = reader.read(task_stream).map_err(|e| { + DataFusionError::Execution(format!("Failed to read Iceberg task: {}", e)) + })?; + + // Convert iceberg stream to SendableRecordBatchStream + let iceberg_stream = stream + .map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e))); + + // Wrap the iceberg stream to make it a RecordBatchStream + let wrapped_stream: SendableRecordBatchStream = Box::pin(IcebergStreamWrapper { + inner: iceberg_stream, + schema: Arc::clone(&schema), + }); + + // Apply schema adaptation using ParquetSchemaAdapterStream + let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + let file_schema = wrapped_stream.schema(); + let adapter_stream = ParquetSchemaAdapterStream::new( + wrapped_stream, + Arc::clone(&schema), + file_schema.as_ref(), + spark_options, + None, + ); + + Ok(Box::pin(adapter_stream) as SendableRecordBatchStream) + })) + } + + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { + loop { + match &mut self.state { + FileStreamState::Idle => { + self.metrics.file_stream.time_opening.start(); + match self.start_next_file() { + Some(future) => { + self.state = FileStreamState::Opening { future }; + } + None => return Poll::Ready(None), + } + } + FileStreamState::Opening { future } => match ready!(future.poll_unpin(cx)) { + Ok(stream) => { + self.metrics.file_stream.time_opening.stop(); + self.metrics.file_stream.time_scanning_until_data.start(); + self.metrics.file_stream.time_scanning_total.start(); + let next = self.start_next_file(); + self.state = FileStreamState::Reading { + current: stream, + next, + }; + } + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, + FileStreamState::Reading { current, next } => { + // Poll next file opening future to drive it forward (background IO) + if let Some(next_future) = next { + if let Poll::Ready(result) = next_future.poll_unpin(cx) { + match result { + Ok(stream) => { + *next = Some(Box::pin(futures::future::ready(Ok(stream)))); + } + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + } + } + } + + match ready!(current.poll_next_unpin(cx)) { + Some(result) => { + // Stop time_scanning_until_data on first batch (idempotent) + self.metrics.file_stream.time_scanning_until_data.stop(); + self.metrics.file_stream.time_scanning_total.stop(); + // Restart time_scanning_total for next batch + self.metrics.file_stream.time_scanning_total.start(); + return Poll::Ready(Some(result)); + } + None => { + self.metrics.file_stream.time_scanning_until_data.stop(); + self.metrics.file_stream.time_scanning_total.stop(); + match next.take() { + Some(mut next_future) => match next_future.poll_unpin(cx) { + Poll::Ready(Ok(stream)) => { + self.metrics.file_stream.time_scanning_until_data.start(); + self.metrics.file_stream.time_scanning_total.start(); + let next_next = self.start_next_file(); + self.state = FileStreamState::Reading { + current: stream, + next: next_next, + }; + } + Poll::Ready(Err(e)) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + Poll::Pending => { + self.state = FileStreamState::Opening { + future: next_future, + }; + } + }, + None => { + return Poll::Ready(None); + } + } + } + } + } + FileStreamState::Error => { + return Poll::Ready(None); + } + } + } + } +} + +impl Stream for IcebergFileStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.metrics.file_stream.time_processing.start(); + let result = self.poll_inner(cx); + self.metrics.file_stream.time_processing.stop(); + self.metrics.baseline.record_poll(result) + } +} + +impl RecordBatchStream for IcebergFileStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +/// Wrapper around iceberg-rust's stream that avoids strict schema checks. +/// Returns the expected output schema to prevent rejection of batches with metadata differences. struct IcebergStreamWrapper { inner: S, schema: SchemaRef, diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 6a4ad97f88..da2671ad9c 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -95,7 +95,7 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; - +use crate::parquet::schema_adapter::ParquetSchemaAdapterExec; use arrow::array::{ new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, @@ -956,12 +956,15 @@ impl PhysicalPlanner { .iter() .map(|offset| *offset as usize) .collect(); + dbg!(&data_schema); + dbg!(&required_schema); + dbg!(&partition_schema); // Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions) let partition_files = &scan.file_partitions[self.partition as usize]; if partition_files.partitioned_file.is_empty() { - let empty_exec = Arc::new(EmptyExec::new(required_schema)); + let empty_exec = Arc::new(EmptyExec::new(data_schema)); return Ok(( vec![], Arc::new(SparkPlan::new(spark_plan.plan_id, empty_exec, vec![])), @@ -972,7 +975,7 @@ impl PhysicalPlanner { let data_filters: Result>, ExecutionError> = scan .data_filters .iter() - .map(|expr| self.create_expr(expr, Arc::clone(&required_schema))) + .map(|expr| self.create_expr(expr, Arc::clone(&data_schema))) .collect(); let default_values: Option> = if !scan @@ -1041,8 +1044,8 @@ impl PhysicalPlanner { Field::new(field.name(), field.data_type().clone(), field.is_nullable()) }) .collect_vec(); - let scan = init_datasource_exec( - required_schema, + let scan_exec = init_datasource_exec( + Arc::clone(&data_schema), Some(data_schema), Some(partition_schema), Some(partition_fields), @@ -1050,15 +1053,30 @@ impl PhysicalPlanner { file_groups, Some(projection_vector), Some(data_filters?), - default_values, scan.session_timezone.as_str(), scan.case_sensitive, self.session_ctx(), scan.encryption_enabled, )?; + + dbg!("after datasource"); + + // Wrap the scan with ParquetSchemaAdapterExec to handle schema transformations + let parquet_options = crate::parquet::parquet_support::SparkParquetOptions::new( + datafusion_comet_spark_expr::EvalMode::Legacy, + &scan.session_timezone, + false, + ); + let adapter_exec: Arc = Arc::new(ParquetSchemaAdapterExec::new( + scan_exec, + required_schema, + parquet_options, + default_values, + )); + Ok(( vec![], - Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])), + Arc::new(SparkPlan::new(spark_plan.plan_id, adapter_exec, vec![])), )) } OpStruct::CsvScan(scan) => { @@ -3422,9 +3440,7 @@ mod tests { use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; - use datafusion::datasource::physical_plan::{ - FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, - }; + use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; use datafusion::error::DataFusionError; use datafusion::logical_expr::ScalarUDF; use datafusion::physical_plan::ExecutionPlan; @@ -3437,7 +3453,7 @@ mod tests { use crate::execution::operators::ExecutionError; use crate::execution::planner::literal_to_array_ref; use crate::parquet::parquet_support::SparkParquetOptions; - use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; + use crate::parquet::schema_adapter::ParquetSchemaAdapterExec; use datafusion_comet_proto::spark_expression::expr::ExprStruct; use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::{ @@ -4022,7 +4038,12 @@ mod tests { .await?; // Write a parquet file into temp folder - session_ctx.write_parquet(plan, test_path, None).await?; + session_ctx + .write_parquet(Arc::clone(&plan), test_path, None) + .await?; + + // Get the file schema from the plan that wrote the data + let file_schema = plan.schema(); // Register all parquet with temp data as file groups let mut file_groups: Vec = vec![]; @@ -4039,22 +4060,28 @@ mod tests { } } - let source = ParquetSource::default().with_schema_adapter_factory(Arc::new( - SparkSchemaAdapterFactory::new( - SparkParquetOptions::new(EvalMode::Ansi, "", false), - None, - ), - ))?; + let source = Arc::new(ParquetSource::default()); let object_store_url = ObjectStoreUrl::local_filesystem(); - let file_scan_config = - FileScanConfigBuilder::new(object_store_url, read_schema.into(), source) - .with_file_groups(file_groups) - .build(); + // Use file_schema for FileScanConfigBuilder to avoid schema validation errors + let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_schema, source) + .with_file_groups(file_groups) + .build(); // Run native read let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let result: Vec<_> = scan.execute(0, session_ctx.task_ctx())?.collect().await; + // ParquetSchemaAdapterExec will handle the schema transformation from file_schema to read_schema + let adapter_exec: Arc = Arc::new(ParquetSchemaAdapterExec::new( + scan, + read_schema.into(), + SparkParquetOptions::new(EvalMode::Ansi, "", false), + None, + )); + + let result: Vec<_> = adapter_exec + .execute(0, session_ctx.task_ctx())? + .collect() + .await; Ok(result.first().unwrap().as_ref().unwrap().clone()) } diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index c8a480e97a..f66fb34032 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -761,8 +761,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat false }; + dbg!(&data_schema); + dbg!(&required_schema); + let scan = init_datasource_exec( - required_schema, + data_schema.clone(), Some(data_schema), None, None, @@ -770,13 +773,15 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat file_groups, None, data_filters, - None, session_timezone.as_str(), case_sensitive != JNI_FALSE, session_ctx, encryption_enabled, )?; + dbg!("1"); + + let partition_index: usize = 0; let batch_stream = Some(scan.execute(partition_index, session_ctx.task_ctx())?); diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ec18d227f5..3ec30629e6 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -18,7 +18,6 @@ use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; use crate::parquet::parquet_support::SparkParquetOptions; -use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; use datafusion::config::TableParquetOptions; use datafusion::datasource::listing::PartitionedFile; @@ -30,10 +29,8 @@ use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_expr::expressions::BinaryExpr; use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::SessionContext; -use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; use itertools::Itertools; -use std::collections::HashMap; use std::sync::Arc; /// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the @@ -65,13 +62,12 @@ pub(crate) fn init_datasource_exec( file_groups: Vec>, projection_vector: Option>, data_filters: Option>>, - default_values: Option>, session_timezone: &str, case_sensitive: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { - let (table_parquet_options, spark_parquet_options) = get_options( + let (table_parquet_options, _) = get_options( session_timezone, case_sensitive, &object_store_url, @@ -104,9 +100,7 @@ pub(crate) fn init_datasource_exec( ); } - let file_source = parquet_source.with_schema_adapter_factory(Arc::new( - SparkSchemaAdapterFactory::new(spark_parquet_options, default_values), - ))?; + let file_source = Arc::new(parquet_source); let file_groups = file_groups .iter() diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 1e0d30c835..88082b00c9 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -20,194 +20,108 @@ use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion::common::ColumnStatistics; -use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; -use datafusion::physical_plan::ColumnarValue; +use datafusion::physical_plan::{ + ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; use datafusion::scalar::ScalarValue; use std::collections::HashMap; use std::sync::Arc; -/// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible -/// `cast` implementation. -#[derive(Clone, Debug)] -pub struct SparkSchemaAdapterFactory { +/// A stream wrapper that applies schema adaptation to batches from an underlying stream. +/// +/// This stream wraps another `RecordBatchStream` and applies Spark-compatible schema +/// transformation to each batch, including type casting and handling missing columns. +pub struct ParquetSchemaAdapterStream { + /// The underlying stream producing batches + inner: datafusion::execution::SendableRecordBatchStream, + /// The target schema after adaptation + required_schema: SchemaRef, + /// Mapping from field index in `required_schema` to index in source schema + field_mappings: Vec>, /// Spark cast options parquet_options: SparkParquetOptions, + /// Default values for missing columns (keyed by required schema index) default_values: Option>, } -impl SparkSchemaAdapterFactory { +impl ParquetSchemaAdapterStream { + /// Create a new schema adapter stream by mapping from source schema to required schema + /// + /// # Arguments + /// * `inner` - The underlying stream to wrap + /// * `required_schema` - The target schema after adaptation + /// * `source_schema` - The schema of batches from the inner stream + /// * `parquet_options` - Spark-specific parquet options for casting + /// * `default_values` - Optional default values for missing columns pub fn new( - options: SparkParquetOptions, + inner: datafusion::execution::SendableRecordBatchStream, + required_schema: SchemaRef, + source_schema: &Schema, + parquet_options: SparkParquetOptions, default_values: Option>, ) -> Self { + let field_mappings = Self::map_schema(&required_schema, source_schema, &parquet_options); + Self { - parquet_options: options, + inner, + required_schema, + field_mappings, + parquet_options, default_values, } } -} - -impl SchemaAdapterFactory for SparkSchemaAdapterFactory { - /// Create a new factory for mapping batches from a file schema to a table - /// schema. - /// - /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with - /// the same schema for both the projected table schema and the table - /// schema. - fn create( - &self, - required_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(SparkSchemaAdapter { - required_schema, - parquet_options: self.parquet_options.clone(), - default_values: self.default_values.clone(), - }) - } -} - -/// This SchemaAdapter requires both the table schema and the projected table -/// schema. See [`SchemaMapping`] for more details -#[derive(Clone, Debug)] -pub struct SparkSchemaAdapter { - /// The schema for the table, projected to include only the fields being output (projected) by the - /// associated ParquetExec - required_schema: SchemaRef, - /// Spark cast options - parquet_options: SparkParquetOptions, - default_values: Option>, -} -impl SchemaAdapter for SparkSchemaAdapter { - /// Map a column index in the table schema to a column index in a particular - /// file schema + /// Map schema to create field mappings from source to required schema /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.required_schema.field(index); - Some( - file_schema - .fields - .iter() - .enumerate() - .find(|(_, b)| { - if self.parquet_options.case_sensitive { - b.name() == field.name() - } else { - b.name().to_lowercase() == field.name().to_lowercase() - } - })? - .0, - ) - } - - /// Creates a `SchemaMapping` for casting or mapping the columns from the - /// file schema to the table schema. - /// - /// If the provided `file_schema` contains columns of a different type to - /// the expected `table_schema`, the method will attempt to cast the array - /// data from the file schema to the table schema where possible. - /// - /// Returns a [`SchemaMapping`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file + /// Returns a vector where each index corresponds to a field in the required schema, + /// and the value is the index of that field in the source schema (or None if missing) fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion::common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.required_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, _table_field)) = self - .required_schema - .fields() + required_schema: &Schema, + source_schema: &Schema, + parquet_options: &SparkParquetOptions, + ) -> Vec> { + dbg!("map_schema"); + let mut field_mappings = vec![None; required_schema.fields().len()]; + + for (required_idx, required_field) in required_schema.fields().iter().enumerate() { + // Find matching field in source schema + let source_idx = source_schema + .fields .iter() .enumerate() - .find(|(_, b)| { - if self.parquet_options.case_sensitive { - b.name() == file_field.name() + .find(|(_, source_field)| { + if parquet_options.case_sensitive { + source_field.name() == required_field.name() } else { - b.name().to_lowercase() == file_field.name().to_lowercase() + source_field.name().to_lowercase() == required_field.name().to_lowercase() } }) - { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - } + .map(|(idx, _)| idx); - Ok(( - Arc::new(SchemaMapping { - required_schema: Arc::::clone(&self.required_schema), - field_mappings, - parquet_options: self.parquet_options.clone(), - default_values: self.default_values.clone(), - }), - projection, - )) - } -} + field_mappings[required_idx] = source_idx; + } -// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast -// instead of arrow cast - can we reduce the amount of code copied here and make -// the DataFusion version more extensible? + dbg!("end map_schema"); -/// The SchemaMapping struct holds a mapping from the file schema to the table -/// schema and any necessary type conversions. -/// -/// Note, because `map_batch` and `map_partial_batch` functions have different -/// needs, this struct holds two schemas: -/// -/// 1. The projected **table** schema -/// 2. The full table schema -/// -/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which -/// has the projected schema, since that's the schema which is supposed to come -/// out of the execution of this query. Thus `map_batch` uses -/// `projected_table_schema` as it can only operate on the projected fields. -/// -/// [`map_batch`]: Self::map_batch -#[derive(Debug)] -pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion - /// and it should match the schema of the query result. - required_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in - /// projected file_schema. - /// - /// They are Options instead of just plain `usize`s because the table could - /// have fields that don't exist in the file. - field_mappings: Vec>, - /// Spark cast options - parquet_options: SparkParquetOptions, - default_values: Option>, -} + field_mappings + } -impl SchemaMapper for SchemaMapping { - /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and - /// conversions. The produced RecordBatch has a schema that contains only the projected - /// columns, so if one needs a RecordBatch with a schema that references columns which are not - /// in the projected, it would be better to use `map_partial_batch` + /// Map a batch from the source schema to the required schema fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result { + dbg!("map_batch"); + let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); let cols = self .required_schema - // go through each field in the projected schema .fields() .iter() .enumerate() - // and zip it with the index that maps fields from the projected table schema to the - // projected file schema in `batch` .zip(&self.field_mappings) - // and for each one... .map(|((field_idx, field), file_idx)| { file_idx.map_or_else( - // If this field only exists in the table, and not in the file, then we need to - // populate a default value for it. + // Field only exists in required schema, not in source || { if let Some(default_values) = &self.default_values { // We have a map of default values, see if this field is in there. @@ -217,9 +131,6 @@ impl SchemaMapper for SchemaMapping { let cv = if field.data_type() == &value.data_type() { ColumnarValue::Scalar(value.clone()) } else { - // Data types don't match. This can happen when default values - // are stored by Spark in a format different than the column's - // type (e.g., INT32 when the column is DATE32) spark_parquet_convert( ColumnarValue::Scalar(value.clone()), field.data_type(), @@ -229,14 +140,12 @@ impl SchemaMapper for SchemaMapping { return cv.into_array(batch_rows); } } - // Construct an entire column of nulls. We use the Scalar representation - // for better performance. + // No default value - create null column let cv = ColumnarValue::Scalar(ScalarValue::try_new_null(field.data_type())?); cv.into_array(batch_rows) }, - // However, if it does exist in both, then try to cast it to the correct output - // type + // Field exists in both schemas - cast if needed |batch_idx| { spark_parquet_convert( ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])), @@ -249,26 +158,187 @@ impl SchemaMapper for SchemaMapping { }) .collect::, _>>()?; - // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = Arc::::clone(&self.required_schema); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) + dbg!("end map_batch"); + + let x = RecordBatch::try_new_with_options(schema, cols, &options).map_err(|e| e.into()); + dbg!(&x); + x + } +} + +impl futures::Stream for ParquetSchemaAdapterStream { + type Item = datafusion::common::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use futures::StreamExt; + + match futures::ready!(self.inner.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + // Apply schema mapping to the batch + let adapted_batch = self.map_batch(batch); + std::task::Poll::Ready(Some(adapted_batch)) + } + Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))), + None => std::task::Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl datafusion::execution::RecordBatchStream for ParquetSchemaAdapterStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.required_schema) } +} + +/// An execution plan that wraps another execution plan and applies schema adaptation +/// to transform batches from the source schema to a required schema. +/// +/// This is useful when you need to apply Spark-compatible type conversions and handle +/// missing columns at the execution plan level. +#[derive(Debug)] +pub struct ParquetSchemaAdapterExec { + /// The underlying execution plan + input: Arc, + /// The target schema after adaptation + required_schema: SchemaRef, + /// Spark cast options + parquet_options: SparkParquetOptions, + /// Default values for missing columns (keyed by required schema index) + default_values: Option>, + /// Cached plan properties + properties: PlanProperties, +} - fn map_column_statistics( +impl ParquetSchemaAdapterExec { + /// Create a new schema adapter execution plan + /// + /// # Arguments + /// * `input` - The underlying execution plan to wrap + /// * `required_schema` - The target schema after adaptation + /// * `parquet_options` - Spark-specific parquet options for casting + /// * `default_values` - Optional default values for missing columns + pub fn new( + input: Arc, + required_schema: SchemaRef, + parquet_options: SparkParquetOptions, + default_values: Option>, + ) -> Self { + use datafusion::physical_expr::EquivalenceProperties; + use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + + // Create properties with the required schema + let eq_properties = EquivalenceProperties::new(Arc::clone(&required_schema)); + let properties = PlanProperties::new( + eq_properties, + input.properties().output_partitioning().clone(), + EmissionType::Final, + Boundedness::Bounded, + ); + + Self { + input, + required_schema, + parquet_options, + default_values, + properties, + } + } + + /// Get the underlying input plan + pub fn input(&self) -> &Arc { + &self.input + } +} + +impl DisplayAs for ParquetSchemaAdapterExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ParquetSchemaAdapterExec: required_schema={:?}", + self.required_schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "ParquetSchemaAdapterExec") + } + } + } +} + +impl ExecutionPlan for ParquetSchemaAdapterExec { + fn name(&self) -> &str { + "ParquetSchemaAdapterExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + if children.len() != 1 { + return Err(datafusion::common::DataFusionError::Internal( + "ParquetSchemaAdapterExec requires exactly one child".to_string(), + )); + } + + Ok(Arc::new(ParquetSchemaAdapterExec::new( + Arc::clone(&children[0]), + Arc::clone(&self.required_schema), + self.parquet_options.clone(), + self.default_values.clone(), + ))) + } + + fn execute( &self, - _file_col_statistics: &[ColumnStatistics], - ) -> datafusion::common::Result> { - Ok(vec![]) + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + // Execute the input plan to get its stream + let input_stream = self.input.execute(partition, context)?; + dbg!(&input_stream.schema()); + let source_schema = input_stream.schema(); + + // Wrap the input stream with schema adaptation + let adapter_stream = ParquetSchemaAdapterStream::new( + input_stream, + Arc::clone(&self.required_schema), + &source_schema, + self.parquet_options.clone(), + self.default_values.clone(), + ); + + + Ok(Box::pin(adapter_stream)) } } #[cfg(test)] mod test { use crate::parquet::parquet_support::SparkParquetOptions; - use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; + use crate::parquet::schema_adapter::ParquetSchemaAdapterExec; use arrow::array::UInt32Array; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::SchemaRef; @@ -277,7 +347,6 @@ mod test { use datafusion::common::config::TableParquetOptions; use datafusion::common::DataFusionError; use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::physical_plan::FileSource; use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::object_store::ObjectStoreUrl; @@ -344,22 +413,235 @@ mod test { let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); spark_parquet_options.allow_cast_unsigned_ints = true; - let parquet_source = - ParquetSource::new(TableParquetOptions::new()).with_schema_adapter_factory( - Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options, None)), - )?; + let parquet_source = Arc::new(ParquetSource::new(TableParquetOptions::new())); let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]); - let file_scan_config = - FileScanConfigBuilder::new(object_store_url, required_schema, parquet_source) - .with_file_groups(vec![files]) - .build(); + let file_scan_config = FileScanConfigBuilder::new( + object_store_url, + Arc::clone(&required_schema), + parquet_source, + ) + .with_file_groups(vec![files]) + .build(); let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec + let adapter_exec: Arc = Arc::new(ParquetSchemaAdapterExec::new( + Arc::new(parquet_exec), + required_schema, + spark_parquet_options, + None, + )); + + let mut stream = adapter_exec .execute(0, Arc::new(TaskContext::default())) .unwrap(); stream.next().await.unwrap() } + + #[tokio::test] + async fn test_schema_adapter_stream() -> Result<(), DataFusionError> { + use datafusion::execution::RecordBatchStream; + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + + // Create source data with Int32 + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ])); + + let ids = Arc::new(Int32Array::from(vec![1, 2, 3])); + let values = Arc::new(Int32Array::from(vec![10, 20, 30])); + let batch = RecordBatch::try_new(Arc::clone(&source_schema), vec![ids, values])?; + + // Create target schema with String for id + let target_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + + // Create Spark options + let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + + // Create a simple stream that yields one batch + let source_stream = RecordBatchStreamAdapter::new( + Arc::clone(&source_schema), + futures::stream::once(async move { Ok(batch) }), + ); + + // Wrap with schema adapter stream - it will compute field mappings internally + let mut adapter_stream = super::ParquetSchemaAdapterStream::new( + Box::pin(source_stream), + Arc::clone(&target_schema), + &source_schema, + spark_options, + None, + ); + + // Verify the schema + assert_eq!(adapter_stream.schema(), target_schema); + + // Read and verify the adapted batch + let adapted_batch = adapter_stream.next().await.unwrap()?; + assert_eq!(adapted_batch.num_rows(), 3); + assert_eq!(adapted_batch.schema(), target_schema); + + // Verify id column was cast to String + let id_col = adapted_batch.column(0); + assert_eq!(id_col.data_type(), &DataType::Utf8); + let id_array = id_col.as_any().downcast_ref::().unwrap(); + assert_eq!(id_array.value(0), "1"); + assert_eq!(id_array.value(1), "2"); + assert_eq!(id_array.value(2), "3"); + + // Verify value column remained Int32 + let value_col = adapted_batch.column(1); + assert_eq!(value_col.data_type(), &DataType::Int32); + let value_array = value_col.as_any().downcast_ref::().unwrap(); + assert_eq!(value_array.value(0), 10); + assert_eq!(value_array.value(1), 20); + assert_eq!(value_array.value(2), 30); + + Ok(()) + } + + #[tokio::test] + async fn test_schema_adapter_exec() -> Result<(), DataFusionError> { + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; + + // Create source data with Int32 + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ])); + + let ids = Arc::new(Int32Array::from(vec![1, 2, 3])); + let values = Arc::new(Int32Array::from(vec![10, 20, 30])); + let batch = RecordBatch::try_new(Arc::clone(&source_schema), vec![ids, values])?; + + // Create target schema with String for id + let target_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + + // Create a simple mock execution plan that returns our batch + #[derive(Debug)] + struct MockExec { + schema: SchemaRef, + batch: RecordBatch, + properties: datafusion::physical_plan::PlanProperties, + } + + impl MockExec { + fn new(schema: SchemaRef, batch: RecordBatch) -> Self { + use datafusion::physical_expr::EquivalenceProperties; + use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion::physical_plan::Partitioning; + + let eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + let properties = datafusion::physical_plan::PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + ); + + Self { + schema, + batch, + properties, + } + } + } + + impl DisplayAs for MockExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "MockExec") + } + } + + impl ExecutionPlan for MockExec { + fn name(&self) -> &str { + "MockExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion::common::Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::common::Result + { + let batch = self.batch.clone(); + let schema = Arc::clone(&self.schema); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { Ok(batch) }), + ))) + } + } + + let mock_exec = Arc::new(MockExec::new(Arc::clone(&source_schema), batch)); + + // Wrap with schema adapter exec + let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + let adapter_exec = super::ParquetSchemaAdapterExec::new( + mock_exec, + Arc::clone(&target_schema), + spark_options, + None, + ); + + // Verify the schema + assert_eq!(adapter_exec.schema(), target_schema); + + // Execute and verify the adapted batch + let mut stream = adapter_exec.execute(0, Arc::new(TaskContext::default()))?; + let adapted_batch = stream.next().await.unwrap()?; + + assert_eq!(adapted_batch.num_rows(), 3); + assert_eq!(adapted_batch.schema(), target_schema); + + // Verify id column was cast to String + let id_col = adapted_batch.column(0); + assert_eq!(id_col.data_type(), &DataType::Utf8); + let id_array = id_col.as_any().downcast_ref::().unwrap(); + assert_eq!(id_array.value(0), "1"); + assert_eq!(id_array.value(1), "2"); + assert_eq!(id_array.value(2), "3"); + + // Verify value column remained Int32 + let value_col = adapted_batch.column(1); + assert_eq!(value_col.data_type(), &DataType::Int32); + let value_array = value_col.as_any().downcast_ref::().unwrap(); + assert_eq!(value_array.value(0), 10); + assert_eq!(value_array.value(1), 20); + assert_eq!(value_array.value(2), 30); + + Ok(()) + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 84a326c790..35f562eacc 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -240,6 +240,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper |""".stripMargin, "select c0 from tbl") } + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - second field") { testSingleLineQuery( """