Skip to content

Commit 6abc162

Browse files
authored
Restore custom SchemaAdapter functionality for Parquet (#16791)
This partially reverts #16461 by keeping backward compatibility with the existing SchemaAdapter APIs.
1 parent 38b87bf commit 6abc162

File tree

12 files changed

+928
-89
lines changed

12 files changed

+928
-89
lines changed

datafusion-examples/examples/default_column_values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ impl TableProvider for DefaultValueTableProvider {
263263
.with_projection(projection.cloned())
264264
.with_limit(limit)
265265
.with_file_group(file_group)
266-
.with_expr_adapter(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _);
266+
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
267267

268268
Ok(Arc::new(DataSourceExec::new(Arc::new(
269269
file_scan_config.build(),

datafusion-examples/examples/json_shredding.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl TableProvider for ExampleTableProvider {
273273
.with_limit(limit)
274274
.with_file_group(file_group)
275275
// if the rewriter needs a reference to the table schema you can bind self.schema() here
276-
.with_expr_adapter(Arc::new(ShreddedJsonRewriterFactory) as _);
276+
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));
277277

278278
Ok(Arc::new(DataSourceExec::new(Arc::new(
279279
file_scan_config.build(),

datafusion/core/src/datasource/listing/table.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use datafusion_execution::{
4848
use datafusion_expr::{
4949
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
5050
};
51+
use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
5152
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5253
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
5354
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
@@ -99,6 +100,8 @@ pub struct ListingTableConfig {
99100
schema_source: SchemaSource,
100101
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
101102
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
103+
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
104+
physical_expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
102105
}
103106

104107
impl ListingTableConfig {
@@ -281,6 +284,7 @@ impl ListingTableConfig {
281284
options: Some(listing_options),
282285
schema_source: self.schema_source,
283286
schema_adapter_factory: self.schema_adapter_factory,
287+
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
284288
})
285289
}
286290

@@ -300,6 +304,7 @@ impl ListingTableConfig {
300304
options: _,
301305
schema_source,
302306
schema_adapter_factory,
307+
physical_expr_adapter_factory,
303308
} = self;
304309

305310
let (schema, new_schema_source) = match file_schema {
@@ -322,6 +327,7 @@ impl ListingTableConfig {
322327
options: Some(options),
323328
schema_source: new_schema_source,
324329
schema_adapter_factory,
330+
physical_expr_adapter_factory,
325331
})
326332
}
327333
None => internal_err!("No `ListingOptions` set for inferring schema"),
@@ -364,6 +370,7 @@ impl ListingTableConfig {
364370
options: Some(options),
365371
schema_source: self.schema_source,
366372
schema_adapter_factory: self.schema_adapter_factory,
373+
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
367374
})
368375
}
369376
None => config_err!("No `ListingOptions` set for inferring schema"),
@@ -415,6 +422,26 @@ impl ListingTableConfig {
415422
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
416423
self.schema_adapter_factory.as_ref()
417424
}
425+
426+
/// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`]
427+
///
428+
/// The expression adapter factory is used to create physical expression adapters that can
429+
/// handle schema evolution and type conversions when evaluating expressions
430+
/// with different schemas than the table schema.
431+
///
432+
/// If not provided, a default physical expression adapter factory will be used unless a custom
433+
/// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
434+
///
435+
/// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
436+
pub fn with_physical_expr_adapter_factory(
437+
self,
438+
physical_expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
439+
) -> Self {
440+
Self {
441+
physical_expr_adapter_factory: Some(physical_expr_adapter_factory),
442+
..self
443+
}
444+
}
418445
}
419446

420447
/// Options for creating a [`ListingTable`]
@@ -911,6 +938,8 @@ pub struct ListingTable {
911938
column_defaults: HashMap<String, Expr>,
912939
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
913940
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
941+
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
942+
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
914943
}
915944

916945
impl ListingTable {
@@ -952,6 +981,7 @@ impl ListingTable {
952981
constraints: Constraints::default(),
953982
column_defaults: HashMap::new(),
954983
schema_adapter_factory: config.schema_adapter_factory,
984+
expr_adapter_factory: config.physical_expr_adapter_factory,
955985
};
956986

957987
Ok(table)
@@ -1196,6 +1226,7 @@ impl TableProvider for ListingTable {
11961226
.with_limit(limit)
11971227
.with_output_ordering(output_ordering)
11981228
.with_table_partition_cols(table_partition_cols)
1229+
.with_expr_adapter(self.expr_adapter_factory.clone())
11991230
.build(),
12001231
)
12011232
.await

datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,70 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
148148
Ok(())
149149
}
150150

151+
#[cfg(feature = "parquet")]
152+
#[tokio::test]
153+
async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter() -> Result<()> {
154+
// Create a temporary directory for our test file
155+
let tmp_dir = TempDir::new()?;
156+
let file_path = tmp_dir.path().join("test.parquet");
157+
let file_path_str = file_path.to_str().unwrap();
158+
159+
// Create test data
160+
let schema = Arc::new(Schema::new(vec![
161+
Field::new("id", DataType::Int32, false),
162+
Field::new("name", DataType::Utf8, true),
163+
]));
164+
165+
let batch = RecordBatch::try_new(
166+
schema.clone(),
167+
vec![
168+
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
169+
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
170+
],
171+
)?;
172+
173+
// Write test parquet file
174+
let file = std::fs::File::create(file_path_str)?;
175+
let props = WriterProperties::builder().build();
176+
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
177+
writer.write(&batch)?;
178+
writer.close()?;
179+
180+
// Create a session context
181+
let ctx = SessionContext::new();
182+
183+
// Create a ParquetSource with the adapter factory
184+
let source = ParquetSource::default()
185+
.with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));
186+
187+
// Create a scan config
188+
let config = FileScanConfigBuilder::new(
189+
ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
190+
schema.clone(),
191+
)
192+
.with_source(source)
193+
.build();
194+
195+
// Create a data source executor
196+
let exec = DataSourceExec::from_data_source(config);
197+
198+
// Collect results
199+
let task_ctx = ctx.task_ctx();
200+
let stream = exec.execute(0, task_ctx)?;
201+
let batches = datafusion::physical_plan::common::collect(stream).await?;
202+
203+
// There should be one batch
204+
assert_eq!(batches.len(), 1);
205+
206+
// Verify the schema has uppercase column names
207+
let result_schema = batches[0].schema();
208+
assert_eq!(result_schema.field(0).name(), "ID");
209+
assert_eq!(result_schema.field(1).name(), "NAME");
210+
211+
Ok(())
212+
}
213+
214+
151215
#[tokio::test]
152216
async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
153217
// This test verifies that the same schema adapter factory can be reused

datafusion/core/tests/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod filter_pushdown;
5050
mod page_pruning;
5151
mod row_group_pruning;
5252
mod schema;
53+
mod schema_adapter;
5354
mod schema_coercion;
5455
mod utils;
5556

0 commit comments

Comments
 (0)