diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index dad12c1c6bc91..7a207920d7549 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -751,6 +751,11 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) If true, use Parquet field IDs for column resolution instead of + /// column names. This enables schema evolution with renamed/reordered columns. + /// When field IDs are unavailable, falls back to name-based matching. + pub field_id_enabled: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties @@ -3071,22 +3076,22 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None - /// The JSON format to use when reading files. - /// - /// When `true` (default), expects newline-delimited JSON (NDJSON): - /// ```text - /// {"key1": 1, "key2": "val"} - /// {"key1": 2, "key2": "vals"} - /// ``` - /// - /// When `false`, expects JSON array format: - /// ```text - /// [ - /// {"key1": 1, "key2": "val"}, - /// {"key1": 2, "key2": "vals"} - /// ] - /// ``` - pub newline_delimited: bool, default = true + /// The JSON format to use when reading files. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, default = true } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f6608d16c1022..0bb2e2edc4b89 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + field_id_enabled: _, // not used for writer props } = self; let mut builder = WriterProperties::builder() @@ -460,6 +461,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + field_id_enabled: defaults.field_id_enabled, } } @@ -574,6 +576,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + field_id_enabled: global_options_defaults.field_id_enabled, }, column_specific_options, key_value_metadata, diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 9d6d7a88566a7..5356e1f930b48 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -21,6 +21,32 @@ use std::str::FromStr; use crate::config::{ConfigField, Visit}; use crate::error::{DataFusionError, Result}; +/// Metadata key for storing Parquet field IDs in Arrow field metadata. +/// +/// Field IDs are stable identifiers for columns in Parquet files that enable +/// schema evolution with renamed or reordered columns. When `field_id_enabled` +/// is true, DataFusion stores field IDs from Parquet files in Arrow field metadata +/// using this key, allowing columns to be matched by ID instead of name. +/// +/// # Example +/// ```rust +/// use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; +/// use arrow::datatypes::Field; +/// use std::collections::HashMap; +/// +/// let mut metadata = HashMap::new(); +/// metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "42".to_string()); +/// let field = Field::new("my_column", arrow::datatypes::DataType::Int32, false) +/// .with_metadata(metadata); +/// +/// // Later, retrieve the field ID +/// let field_id = field.metadata() +/// .get(PARQUET_FIELD_ID_META_KEY) +/// .and_then(|s| s.parse::().ok()); +/// assert_eq!(field_id, Some(42)); +/// ``` +pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; + /// Parquet writer version options for controlling the Parquet file format version /// /// This enum validates parquet writer version values at configuration time, diff --git a/datafusion/core/tests/parquet/field_id.rs b/datafusion/core/tests/parquet/field_id.rs new file mode 100644 index 0000000000000..62497b7155338 --- /dev/null +++ b/datafusion/core/tests/parquet/field_id.rs @@ -0,0 +1,853 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for Parquet field ID support + +use arrow::array::{ + Array, Int32Array, Int64Array, RecordBatch, StringArray, StringViewArray, StructArray, +}; +use arrow::datatypes::{DataType, Field, Fields, Schema}; +use datafusion::prelude::*; +use datafusion_common::Result; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use std::collections::HashMap; +use std::fs::File; +use std::sync::Arc; +use tempfile::TempDir; + +/// Helper to create a test Parquet file with field IDs +fn create_parquet_file_with_field_ids( + path: &str, + schema: Arc, + batches: Vec, +) -> Result<()> { + let file = File::create(path)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props))?; + + for batch in batches { + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) +} + +/// Creates an Arrow schema with field IDs stored in metadata +/// +/// # Arguments +/// * `fields` - Vector of `(name, data_type, field_id)` tuples +/// +/// # Returns +/// Arrow `Schema` (in-memory) with field IDs in field metadata. +/// When written to Parquet, field IDs are transferred to the Parquet schema. +/// +/// # Example +/// ``` +/// schema_with_field_ids(vec![ +/// ("id".to_string(), DataType::Int32, 1), +/// ("name".to_string(), DataType::Utf8, 2), +/// ]); +/// ``` +/// +/// Creates Arrow schema (in-memory): +/// ``` +/// Field("id", Int32, metadata={"PARQUET:field_id": "1"}) +/// Field("name", Utf8, metadata={"PARQUET:field_id": "2"}) +/// ``` +/// +/// When written to Parquet, produces Parquet schema (on-disk): +/// ``` +/// Column[0]: "id" (INT32, field_id=1) +/// Column[1]: "name" (BYTE_ARRAY/UTF8, field_id=2) +/// ``` +fn schema_with_field_ids(fields: Vec<(String, DataType, i32)>) -> Schema { + use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; + + let fields_with_ids: Vec = fields + .into_iter() + .map(|(name, dtype, field_id)| { + let mut metadata = HashMap::new(); + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); + Field::new(name, dtype, false).with_metadata(metadata) + }) + .collect(); + + Schema::new(fields_with_ids) +} + +#[tokio::test] +async fn test_read_parquet_with_field_ids_enabled() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Create schema with field IDs + let schema = Arc::new(schema_with_field_ids(vec![ + ("user_id".to_string(), DataType::Int64, 1), + ("amount".to_string(), DataType::Int64, 2), + ("name".to_string(), DataType::Utf8, 3), + ])); + + // Create test data + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ], + )?; + + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + // Register table and query + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql("SELECT user_id, amount, name FROM test").await?; + let results = df.collect().await?; + + // Verify results + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + + let user_ids = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(user_ids.value(0), 1); + assert_eq!(user_ids.value(1), 2); + assert_eq!(user_ids.value(2), 3); + + Ok(()) +} + +#[tokio::test] +async fn test_read_parquet_with_field_ids_disabled() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Create schema with field IDs + let schema = Arc::new(schema_with_field_ids(vec![ + ("user_id".to_string(), DataType::Int64, 1), + ("amount".to_string(), DataType::Int64, 2), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![100, 200, 300])), + ], + )?; + + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + // Create context with field ID reading disabled (default) + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql("SELECT user_id, amount FROM test").await?; + let results = df.collect().await?; + + // Should still work with name-based matching + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + + Ok(()) +} + +#[tokio::test] +async fn test_schema_evolution_renamed_columns() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Write file with original column names and field IDs + let write_schema = Arc::new(schema_with_field_ids(vec![ + ("user_id".to_string(), DataType::Int64, 1), + ("amount".to_string(), DataType::Int64, 2), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&write_schema), + vec![ + Arc::new(Int64Array::from(vec![101, 102, 103])), + Arc::new(Int64Array::from(vec![500, 600, 700])), + ], + )?; + + create_parquet_file_with_field_ids( + file_path.to_str().unwrap(), + write_schema, + vec![batch], + )?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + // Register table with original names + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Query should work with original names + let df = ctx.sql("SELECT user_id, amount FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + let user_ids = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(user_ids.value(0), 101); + + Ok(()) +} + +#[tokio::test] +async fn test_schema_evolution_reordered_columns() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Write file with columns in order: a, b, c + let write_schema = Arc::new(schema_with_field_ids(vec![ + ("a".to_string(), DataType::Int32, 1), + ("b".to_string(), DataType::Int32, 2), + ("c".to_string(), DataType::Int32, 3), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&write_schema), + vec![ + Arc::new(Int32Array::from(vec![10, 20, 30])), + Arc::new(Int32Array::from(vec![40, 50, 60])), + Arc::new(Int32Array::from(vec![70, 80, 90])), + ], + )?; + + create_parquet_file_with_field_ids( + file_path.to_str().unwrap(), + write_schema, + vec![batch], + )?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Query columns in different order: c, a, b + let df = ctx.sql("SELECT c, a, b FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + + let c_vals = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let a_vals = results[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let b_vals = results[0] + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify correct data regardless of order + assert_eq!(c_vals.value(0), 70); + assert_eq!(a_vals.value(0), 10); + assert_eq!(b_vals.value(0), 40); + + Ok(()) +} + +#[tokio::test] +async fn test_projection_with_field_ids() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Create schema with field IDs + let schema = Arc::new(schema_with_field_ids(vec![ + ("a".to_string(), DataType::Int32, 1), + ("b".to_string(), DataType::Int32, 2), + ("c".to_string(), DataType::Int32, 3), + ("d".to_string(), DataType::Int32, 4), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(Int32Array::from(vec![1000, 2000, 3000])), + ], + )?; + + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Project only columns a and c + let df = ctx.sql("SELECT a, c FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_columns(), 2); + + let a_vals = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let c_vals = results[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(a_vals.value(0), 1); + assert_eq!(c_vals.value(0), 100); + + Ok(()) +} + +#[tokio::test] +async fn test_filter_with_field_ids() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + let schema = Arc::new(schema_with_field_ids(vec![ + ("id".to_string(), DataType::Int32, 1), + ("value".to_string(), DataType::Int32, 2), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])), + ], + )?; + + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Filter with field IDs + let df = ctx + .sql("SELECT id, value FROM test WHERE value > 25") + .await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); // Should have rows with values 30, 40, 50 + + let id_vals = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_vals.value(0), 3); + assert_eq!(id_vals.value(1), 4); + assert_eq!(id_vals.value(2), 5); + + Ok(()) +} + +#[tokio::test] +async fn test_aggregation_with_field_ids() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + let schema = Arc::new(schema_with_field_ids(vec![ + ("category".to_string(), DataType::Utf8, 1), + ("value".to_string(), DataType::Int32, 2), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec!["A", "B", "A", "B", "A"])), + Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])), + ], + )?; + + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Aggregate with field IDs + let df = ctx + .sql("SELECT category, SUM(value) as total FROM test GROUP BY category ORDER BY category") + .await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 2); + + // Get category column - it might be StringArray or StringViewArray depending on config + let category_col = results[0].column(0); + let categories: Vec<&str> = match category_col.data_type() { + DataType::Utf8 => category_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + DataType::Utf8View => category_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + _ => panic!( + "Unexpected data type for category column: {:?}", + category_col.data_type() + ), + }; + + let totals = results[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(categories[0], "A"); + assert_eq!(totals.value(0), 90); // 10 + 30 + 50 + + assert_eq!(categories[1], "B"); + assert_eq!(totals.value(1), 60); // 20 + 40 + + Ok(()) +} + +#[tokio::test] +async fn test_schema_evolution_added_column() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Write file with only 2 columns + let write_schema = Arc::new(schema_with_field_ids(vec![ + ("id".to_string(), DataType::Int32, 1), + ("name".to_string(), DataType::Utf8, 2), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&write_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ], + )?; + + create_parquet_file_with_field_ids( + file_path.to_str().unwrap(), + write_schema, + vec![batch], + )?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Query should work - reading only the columns that exist + let df = ctx.sql("SELECT id, name FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + assert_eq!(results[0].num_columns(), 2); + + let id_vals = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_vals.value(0), 1); + assert_eq!(id_vals.value(1), 2); + assert_eq!(id_vals.value(2), 3); + + // Get name column - might be StringArray or StringViewArray + let name_col = results[0].column(1); + let names: Vec<&str> = match name_col.data_type() { + DataType::Utf8 => name_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + DataType::Utf8View => name_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect(), + _ => panic!("Unexpected data type for name column"), + }; + + assert_eq!(names, vec!["Alice", "Bob", "Charlie"]); + + Ok(()) +} + +#[tokio::test] +async fn test_schema_evolution_missing_column() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + + // Write file with 3 columns + let write_schema = Arc::new(schema_with_field_ids(vec![ + ("id".to_string(), DataType::Int32, 1), + ("name".to_string(), DataType::Utf8, 2), + ("status".to_string(), DataType::Utf8, 3), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&write_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + Arc::new(StringArray::from(vec!["active", "inactive", "active"])), + ], + )?; + + create_parquet_file_with_field_ids( + file_path.to_str().unwrap(), + write_schema, + vec![batch], + )?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Query should work - only reading columns id and name (skipping status) + let df = ctx.sql("SELECT id, name FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + assert_eq!(results[0].num_columns(), 2); + + let id_vals = results[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_vals.value(0), 1); + assert_eq!(id_vals.value(1), 2); + assert_eq!(id_vals.value(2), 3); + + // Verify we can still read all columns if we want + let df_all = ctx.sql("SELECT id, name, status FROM test").await?; + let results_all = df_all.collect().await?; + + assert_eq!(results_all.len(), 1); + assert_eq!(results_all[0].num_columns(), 3); + + Ok(()) +} + +#[tokio::test] +async fn test_schema_evolution_column_type_changed() -> Result<()> { + let tmp_dir = TempDir::new()?; + let old_file = tmp_dir.path().join("old.parquet"); + let new_file = tmp_dir.path().join("new.parquet"); + + // Write old file with Int32 for amount column (field_id=2) + let old_schema = Arc::new(schema_with_field_ids(vec![ + ("id".to_string(), DataType::Int32, 1), + ("amount".to_string(), DataType::Int32, 2), + ])); + + let old_batch = RecordBatch::try_new( + Arc::clone(&old_schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int32Array::from(vec![100, 200])), + ], + )?; + + create_parquet_file_with_field_ids( + old_file.to_str().unwrap(), + old_schema, + vec![old_batch], + )?; + + // Write new file with Int64 for amount column (field_id=2 - SAME field ID, different type) + let new_schema = Arc::new(schema_with_field_ids(vec![ + ("id".to_string(), DataType::Int32, 1), + ("amount".to_string(), DataType::Int64, 2), + ])); + + let new_batch = RecordBatch::try_new( + Arc::clone(&new_schema), + vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(Int64Array::from(vec![300, 400])), + ], + )?; + + create_parquet_file_with_field_ids( + new_file.to_str().unwrap(), + new_schema, + vec![new_batch], + )?; + + // Create context with field ID reading enabled + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + // Register both files with different table names + ctx.register_parquet( + "old_table", + old_file.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "new_table", + new_file.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + // Query old file - should have Int32 amounts + let df_old = ctx.sql("SELECT id, amount FROM old_table").await?; + let results_old = df_old.collect().await?; + + assert_eq!(results_old.len(), 1); + let old_amounts = results_old[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(old_amounts.value(0), 100); + assert_eq!(old_amounts.value(1), 200); + + // Query new file - should have Int64 amounts + let df_new = ctx.sql("SELECT id, amount FROM new_table").await?; + let results_new = df_new.collect().await?; + + assert_eq!(results_new.len(), 1); + let new_amounts = results_new[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(new_amounts.value(0), 300); + assert_eq!(new_amounts.value(1), 400); + + Ok(()) +} + +/// Test to verify that nested struct types are rejected when field_id_enabled = true +#[tokio::test] +async fn test_nested_struct_with_field_ids_fails_with_clear_error() -> Result<()> { + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("nested.parquet"); + + // Create a schema with nested struct + let address_fields = Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("address", DataType::Struct(address_fields.clone()), false), + Field::new("age", DataType::Int32, false), + ])); + + // Create test data + let street_array = Arc::new(StringArray::from(vec!["Main St", "Oak Ave"])); + let city_array = Arc::new(StringArray::from(vec!["NYC", "LA"])); + + let address_struct = + StructArray::new(address_fields, vec![street_array, city_array], None); + + let age_array = Arc::new(Int32Array::from(vec![30, 25])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(address_struct), age_array], + )?; + + // Write to Parquet + create_parquet_file_with_field_ids(file_path.to_str().unwrap(), schema, vec![batch])?; + + // Reading with field_id_enabled = false should work fine + let ctx = SessionContext::new(); + ctx.register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql("SELECT age FROM test").await?; + let results = df.collect().await?; + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 2); + + // Clean up for next test + ctx.deregister_table("test")?; + + // Now enable field_id_enabled + ctx.sql("SET datafusion.execution.parquet.field_id_enabled = true") + .await? + .collect() + .await?; + + // This should now fail with a clear error message about nested types + let result = ctx + .register_parquet( + "test", + file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await; + + match result { + Err(e) => { + let error_msg = e.to_string(); + assert!( + error_msg.contains("not yet supported for nested/complex types"), + "Expected error about nested types, got: {error_msg}", + ); + assert!( + error_msg.contains("address"), + "Error should mention the problematic field 'address'" + ); + assert!( + error_msg.contains("Struct"), + "Error should mention the Struct type" + ); + } + Ok(_) => { + // Try to query to see if it fails there + ctx.sql("SELECT age FROM test").await?; + + panic!( + "Expected error when registering Parquet file with nested types and field_id_enabled=true" + ); + } + } + + Ok(()) +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 5a05718936509..df99c24629716 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -49,6 +49,7 @@ mod custom_reader; mod encryption; mod expr_adapter; mod external_access_plan; +mod field_id; mod file_statistics; mod filter_pushdown; mod ordering; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d59b42ed15d15..5c1f57cfe52e9 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -27,6 +27,7 @@ use std::{fmt, vec}; use arrow::array::RecordBatch; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; +use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; use datafusion_datasource::TableSchema; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -281,6 +282,11 @@ impl ParquetFormat { self.options.global.coerce_int96 = time_unit; self } + + /// Get whether field ID is enabled from options + pub fn field_id_enabled(&self) -> bool { + self.options.global.field_id_enabled + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -385,6 +391,7 @@ impl FileFormat for ParquetFormat { .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) .with_coerce_int96(coerce_int96) + .with_enable_field_ids(self.field_id_enabled()) .fetch_schema_with_location() .await?; Ok::<_, DataFusionError>(result) @@ -626,6 +633,8 @@ impl ParquetFormat { /// # Arguments /// * `table_schema` - The table schema containing the desired types /// * `file_schema` - The file schema to be transformed +/// * `enable_field_ids` - If true, matches columns by field ID first, then falls back to name. +/// If false, matches columns by name only /// /// # Returns /// * `Some(Schema)` - If any transformations were applied, returns the transformed schema @@ -633,6 +642,7 @@ impl ParquetFormat { pub fn apply_file_schema_type_coercions( table_schema: &Schema, file_schema: &Schema, + enable_field_ids: bool, ) -> Option { let mut needs_view_transform = false; let mut needs_string_transform = false; @@ -660,6 +670,22 @@ pub fn apply_file_schema_type_coercions( }) .collect(); + // Build field ID to field mapping if field IDs are enabled + let table_field_by_id: HashMap> = if enable_field_ids { + table_schema + .fields() + .iter() + .filter_map(|f| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id_str| id_str.parse::().ok()) + .map(|id| (id, f)) + }) + .collect() + } else { + HashMap::new() + }; + // Early return if no transformation needed if !needs_view_transform && !needs_string_transform { return None; @@ -672,8 +698,23 @@ pub fn apply_file_schema_type_coercions( let field_name = field.name(); let field_type = field.data_type(); + // Try to find matching table field by field ID or name + let table_type = if enable_field_ids { + // Try field ID matching first + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id_str| id_str.parse::().ok()) + .and_then(|id| table_field_by_id.get(&id)) + .map(|f| f.data_type()) + .or_else(|| table_fields.get(field_name).copied()) + } else { + // Name-based matching only + table_fields.get(field_name).copied() + }; + // Look up the corresponding field type in the table schema - if let Some(table_type) = table_fields.get(field_name) { + if let Some(table_type) = table_type { match (table_type, field_type) { // table schema uses string type, coerce the file schema to use string type ( diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 5a4c0bcdd514d..269fc4f93b7a4 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -29,7 +29,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, + ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, not_impl_err, }; use datafusion_execution::cache::cache_manager::{ CachedFileMetadataEntry, FileMetadata, FileMetadataCache, @@ -68,6 +68,72 @@ pub struct DFParquetMetadata<'a> { file_metadata_cache: Option>, /// timeunit to coerce INT96 timestamps to pub coerce_int96: Option, + /// Whether to extract and use Parquet field IDs for column resolution + pub enable_field_ids: bool, +} + +/// Extracts Parquet field IDs and stores them in Arrow field metadata +/// under the key `\[PARQUET_FIELD_ID_META_KEY`\] +/// +/// # Limitations +/// +/// Currently only supports flat schemas (top-level primitive fields). +/// Returns an error if the schema contains nested types (structs, lists, maps). +/// +/// # Errors +/// +/// Returns an error if `arrow_schema` contains any complex/nested types when field IDs +/// are enabled, as these are not yet supported. +/// Nested type support see () +fn add_field_ids_to_arrow_schema( + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, +) -> Result { + use arrow::datatypes::Field; + use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; + + // Validate that schema is flat (no nested types) + // This prevents incorrect field ID assignment for complex types + for (idx, field) in arrow_schema.fields().iter().enumerate() { + if field.data_type().is_nested() { + return not_impl_err!( + "Field ID reading is not yet supported for nested/complex types. \ + Field '{}' at index {} has type {:?}.", + field.name(), + idx, + field.data_type() + ); + } + } + + let fields_with_ids: Vec> = arrow_schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| { + // Get the corresponding Parquet column descriptor + let col_desc = parquet_schema.column(idx); + + // Extract field ID from the schema type + // Field IDs are optional in Parquet; if not set, they may be 0 or negative + let field_id = col_desc.self_type().get_basic_info().id(); + + if field_id > 0 { + // Add field ID to field metadata + let mut metadata = field.metadata().clone(); + metadata + .insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); + Arc::new(field.as_ref().clone().with_metadata(metadata)) + } else { + Arc::clone(field) + } + }) + .collect(); + + Ok(Schema::new_with_metadata( + fields_with_ids, + arrow_schema.metadata().clone(), + )) } impl<'a> DFParquetMetadata<'a> { @@ -79,6 +145,7 @@ impl<'a> DFParquetMetadata<'a> { decryption_properties: None, file_metadata_cache: None, coerce_int96: None, + enable_field_ids: false, } } @@ -112,6 +179,12 @@ impl<'a> DFParquetMetadata<'a> { self } + /// Set whether to extract and use Parquet field IDs for column resolution + pub fn with_enable_field_ids(mut self, enable: bool) -> Self { + self.enable_field_ids = enable; + self + } + /// Fetch parquet metadata from the remote object store pub async fn fetch_metadata(&self) -> Result> { let Self { @@ -121,6 +194,7 @@ impl<'a> DFParquetMetadata<'a> { decryption_properties, file_metadata_cache, coerce_int96: _, + enable_field_ids: _, } = self; let fetch = ObjectStoreFetch::new(*store, object_meta); @@ -180,10 +254,17 @@ impl<'a> DFParquetMetadata<'a> { let metadata = self.fetch_metadata().await?; let file_metadata = metadata.file_metadata(); - let schema = parquet_to_arrow_schema( + let mut schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + + // Add field IDs if requested + if self.enable_field_ids { + schema = + add_field_ids_to_arrow_schema(&schema, file_metadata.schema_descr())?; + } + let schema = self .coerce_int96 .as_ref() @@ -279,9 +360,12 @@ impl<'a> DFParquetMetadata<'a> { file_metadata.key_value_metadata(), )?; - if let Some(merged) = - apply_file_schema_type_coercions(logical_file_schema, &physical_file_schema) - { + // Apply type coercions without field ID matching (statistics use name-based matching) + if let Some(merged) = apply_file_schema_type_coercions( + logical_file_schema, + &physical_file_schema, + false, + ) { physical_file_schema = merged; } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..87d0a5b67b561 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -27,7 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{ + reassign_expr_columns, reassign_expr_columns_with_field_ids, +}; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::HashMap; use std::pin::Pin; @@ -120,6 +122,8 @@ pub(super) struct ParquetOpener { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Whether to use Parquet field IDs for column resolution + pub field_id_enabled: bool, } /// Represents a prepared access plan with optional row selection @@ -205,6 +209,7 @@ impl FileOpener for ParquetOpener { )?; let batch_size = self.batch_size; + let field_id_enabled = self.field_id_enabled; // Calculate the output schema from the original projection (before literal replacement) // so we get correct field names from column references @@ -378,6 +383,7 @@ impl FileOpener for ParquetOpener { if let Some(merged) = apply_file_schema_type_coercions( &logical_file_schema, &physical_file_schema, + field_id_enabled, ) { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); @@ -624,8 +630,18 @@ impl FileOpener for ParquetOpener { // Rebase column indices to match the narrowed stream schema. // The projection expressions have indices based on physical_file_schema, // but the stream only contains the columns selected by the ProjectionMask. - let projection = projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projection = if field_id_enabled { + projection.try_map_exprs(|expr| { + reassign_expr_columns_with_field_ids( + expr, + &physical_file_schema, + &stream_schema, + ) + })? + } else { + projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))? + }; let projector = projection.make_projector(&stream_schema)?; @@ -1064,6 +1080,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + field_id_enabled: bool, } impl ParquetOpenerBuilder { @@ -1090,6 +1107,7 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + field_id_enabled: false, } } @@ -1197,6 +1215,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + field_id_enabled: self.field_id_enabled, preserve_order: self.preserve_order, } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 75d87a4cd16fc..bd68f7a9163c9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -568,6 +568,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + field_id_enabled: self.table_parquet_options.global.field_id_enabled, }); Ok(opener) } diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 2cdc326f5dd36..36538ab0ec253 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -33,6 +33,7 @@ use datafusion_common::tree_node::{ use datafusion_common::{HashMap, HashSet, Result}; use datafusion_expr::Operator; +use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; use petgraph::graph::NodeIndex; use petgraph::stable_graph::StableGraph; @@ -265,9 +266,82 @@ pub fn reassign_expr_columns( .data() } +/// Extract Parquet field ID from Arrow field metadata +fn get_field_id(field: &arrow::datatypes::Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|s| s.parse::().ok()) +} + +/// Find field index by field ID with fallback to name-based matching +fn find_field_index( + column_name: &str, + source_schema: &Schema, + target_schema: &Schema, +) -> Result { + // Try to find the field in source schema + let source_field = source_schema.field_with_name(column_name)?; + + // Check if field has a field ID + if let Some(source_field_id) = get_field_id(source_field) { + // Search target schema for matching field ID + for (idx, target_field) in target_schema.fields().iter().enumerate() { + if let Some(target_field_id) = get_field_id(target_field) + && source_field_id == target_field_id + { + return Ok(idx); + } + } + } + + // Fallback to name-based matching + Ok(target_schema.index_of(column_name)?) +} + +/// Re-assign column indices in expressions using field ID-based matching. +/// +/// This function traverses the expression tree and updates all `Column` references +/// to use field IDs for matching between source and target schemas, falling back +/// to name-based matching when field IDs are unavailable. +/// +/// # Arguments +/// +/// * `expr` - The physical expression to update +/// * `source_schema` - The schema that the expression currently references +/// * `target_schema` - The schema to map columns to +/// +/// # Limitations +/// +/// Currently only supports flat schemas (top-level columns). Nested field +/// references (e.g., "address.city") are not yet supported. +/// For nested schema see: () +/// # Errors +/// +/// This function will return an error if any column in the expression cannot be found +/// in the target schema by either field ID or name. +pub fn reassign_expr_columns_with_field_ids( + expr: Arc, + source_schema: &Schema, + target_schema: &Schema, +) -> Result> { + expr.transform_down(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() { + let index = find_field_index(column.name(), source_schema, target_schema)?; + return Ok(Transformed::yes(Arc::new(Column::new( + column.name(), + index, + )))); + } + Ok(Transformed::no(expr)) + }) + .data() +} + #[cfg(test)] pub(crate) mod tests { use std::any::Any; + use std::collections::HashMap; use std::fmt::{Display, Formatter}; use super::*; @@ -281,6 +355,7 @@ pub(crate) mod tests { ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; + use datafusion_common::parquet_config::PARQUET_FIELD_ID_META_KEY; use petgraph::visit::Bfs; #[derive(Debug, PartialEq, Eq, Hash)] @@ -562,4 +637,333 @@ pub(crate) mod tests { assert_eq!(collect_columns(&expr3), expected); Ok(()) } + + // ======================================================================== + // Field ID Tests + // ======================================================================== + + #[test] + fn test_get_field_id_present() { + let mut metadata = HashMap::new(); + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "42".to_string()); + let field = Field::new("test", DataType::Int64, false).with_metadata(metadata); + + assert_eq!(get_field_id(&field), Some(42)); + } + + #[test] + fn test_get_field_id_absent() { + let field = Field::new("test", DataType::Int64, false); + assert_eq!(get_field_id(&field), None); + } + + #[test] + fn test_get_field_id_invalid() { + let mut metadata = HashMap::new(); + metadata.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + "not_a_number".to_string(), + ); + let field = Field::new("test", DataType::Int64, false).with_metadata(metadata); + + assert_eq!(get_field_id(&field), None); + } + + #[test] + fn test_find_field_index_by_field_id() -> Result<()> { + // Source schema: field IDs present + let mut metadata1 = HashMap::new(); + metadata1.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + let mut metadata2 = HashMap::new(); + metadata2.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false) + .with_metadata(metadata1.clone()), + Field::new("amount", DataType::Float64, false) + .with_metadata(metadata2.clone()), + ]); + + // Target schema: renamed columns but same field IDs + let target_schema = Schema::new(vec![ + Field::new("customer_id", DataType::Int64, false).with_metadata(metadata1), + Field::new("price", DataType::Float64, false).with_metadata(metadata2), + ]); + + // Should match by field ID, not name + let index = find_field_index("user_id", &source_schema, &target_schema)?; + assert_eq!( + index, 0, + "user_id (field_id=1) should match customer_id at index 0" + ); + + let index = find_field_index("amount", &source_schema, &target_schema)?; + assert_eq!( + index, 1, + "amount (field_id=2) should match price at index 1" + ); + + Ok(()) + } + + #[test] + fn test_find_field_index_by_field_id_reordered() -> Result<()> { + // Source schema: columns in order [a, b, c] + let mut meta_a = HashMap::new(); + meta_a.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + let mut meta_b = HashMap::new(); + meta_b.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); + let mut meta_c = HashMap::new(); + meta_c.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false).with_metadata(meta_a.clone()), + Field::new("b", DataType::Int64, false).with_metadata(meta_b.clone()), + Field::new("c", DataType::Int64, false).with_metadata(meta_c.clone()), + ]); + + // Target schema: columns reordered [c, a, b] + let target_schema = Schema::new(vec![ + Field::new("c", DataType::Int64, false).with_metadata(meta_c), + Field::new("a", DataType::Int64, false).with_metadata(meta_a), + Field::new("b", DataType::Int64, false).with_metadata(meta_b), + ]); + + // Should match by field ID + assert_eq!(find_field_index("a", &source_schema, &target_schema)?, 1); + assert_eq!(find_field_index("b", &source_schema, &target_schema)?, 2); + assert_eq!(find_field_index("c", &source_schema, &target_schema)?, 0); + + Ok(()) + } + + #[test] + fn test_find_field_index_fallback_to_name() -> Result<()> { + // Source schema: no field IDs + let source_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ]); + + // Target schema: no field IDs + let target_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ]); + + // Should fall back to name-based matching + assert_eq!( + find_field_index("user_id", &source_schema, &target_schema)?, + 0 + ); + assert_eq!( + find_field_index("amount", &source_schema, &target_schema)?, + 1 + ); + + Ok(()) + } + + #[test] + fn test_find_field_index_mixed_field_ids() -> Result<()> { + // Source schema: some fields have IDs, some don't + let mut metadata1 = HashMap::new(); + metadata1.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false).with_metadata(metadata1.clone()), + Field::new("b", DataType::Int64, false), // No field ID + ]); + + let target_schema = Schema::new(vec![ + Field::new("renamed_a", DataType::Int64, false).with_metadata(metadata1), + Field::new("b", DataType::Int64, false), + ]); + + // Field with ID should match by ID + assert_eq!(find_field_index("a", &source_schema, &target_schema)?, 0); + + // Field without ID should match by name + assert_eq!(find_field_index("b", &source_schema, &target_schema)?, 1); + + Ok(()) + } + + #[test] + fn test_find_field_index_not_found() { + let source_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let target_schema = Schema::new(vec![Field::new("b", DataType::Int64, false)]); + + // Should fail to find non-existent field + let result = find_field_index("a", &source_schema, &target_schema); + assert!(result.is_err()); + } + + #[test] + fn test_reassign_expr_columns_with_field_ids_simple() -> Result<()> { + // Source schema: full file schema + let mut meta1 = HashMap::new(); + meta1.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + let mut meta2 = HashMap::new(); + meta2.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); + let mut meta3 = HashMap::new(); + meta3.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false).with_metadata(meta1.clone()), + Field::new("name", DataType::Utf8, false).with_metadata(meta2), + Field::new("age", DataType::Int32, false).with_metadata(meta3.clone()), + ]); + + // Target schema: projected schema (only user_id and age) + let target_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false).with_metadata(meta1), + Field::new("age", DataType::Int32, false).with_metadata(meta3), + ]); + + // Expression references age at index 2 in source schema + let expr: Arc = Arc::new(Column::new("age", 2)); + + // After transformation, should reference age at index 1 in target schema + let result = + reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?; + + let column = result.as_any().downcast_ref::().unwrap(); + assert_eq!(column.name(), "age"); + assert_eq!( + column.index(), + 1, + "age should be at index 1 in target schema" + ); + + Ok(()) + } + + #[test] + fn test_reassign_expr_columns_with_field_ids_complex() -> Result<()> { + // Source schema + let mut meta1 = HashMap::new(); + meta1.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + let mut meta2 = HashMap::new(); + meta2.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); + let mut meta3 = HashMap::new(); + meta3.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(meta1.clone()), + Field::new("b", DataType::Int32, false).with_metadata(meta2.clone()), + Field::new("c", DataType::Int32, false).with_metadata(meta3.clone()), + ]); + + // Target schema: only columns a and c (b excluded) + let target_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(meta1), + Field::new("c", DataType::Int32, false).with_metadata(meta3), + ]); + + // Expression: a@0 + c@2 + let expr = binary( + col("a", &source_schema)?, + Operator::Plus, + col("c", &source_schema)?, + &source_schema, + )?; + + // After transformation: a@0 + c@1 + let result = + reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?; + + // Verify it's still a binary expression + let binary_expr = result.as_any().downcast_ref::().unwrap(); + + // Check left side (a) + let left_col = binary_expr + .left() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(left_col.name(), "a"); + assert_eq!(left_col.index(), 0); + + // Check right side (c) + let right_col = binary_expr + .right() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(right_col.name(), "c"); + assert_eq!( + right_col.index(), + 1, + "c should be remapped from index 2 to 1" + ); + + Ok(()) + } + + #[test] + fn test_reassign_expr_columns_with_field_ids_renamed_columns() -> Result<()> { + // Source schema (file schema with old names) + let mut meta1 = HashMap::new(); + meta1.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); + let mut meta2 = HashMap::new(); + meta2.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); + + let source_schema = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false).with_metadata(meta1.clone()), + Field::new("amount", DataType::Float64, false).with_metadata(meta2.clone()), + ]); + + // Target schema (query schema with renamed columns) + let target_schema = Schema::new(vec![ + Field::new("customer_id", DataType::Int64, false).with_metadata(meta1), + Field::new("price", DataType::Float64, false).with_metadata(meta2), + ]); + + // Expression references old names at their source indices + let expr: Arc = Arc::new(Column::new("user_id", 0)); + + // After transformation, should still reference by old name but correct index + let result = + reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?; + + let column = result.as_any().downcast_ref::().unwrap(); + assert_eq!(column.name(), "user_id", "Name should remain user_id"); + assert_eq!( + column.index(), + 0, + "Should match customer_id at index 0 via field_id" + ); + + Ok(()) + } + + #[test] + fn test_reassign_expr_columns_with_field_ids_no_field_ids() -> Result<()> { + // Schemas without field IDs - should fall back to name matching + let source_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let target_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + // Expression: c@2 + let expr: Arc = Arc::new(Column::new("c", 2)); + + // Should fall back to name-based matching + let result = + reassign_expr_columns_with_field_ids(expr, &source_schema, &target_schema)?; + + let column = result.as_any().downcast_ref::().unwrap(); + assert_eq!(column.name(), "c"); + assert_eq!(column.index(), 1, "c should be found by name at index 1"); + + Ok(()) + } } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..487d6c40565da 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1090,6 +1090,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + field_id_enabled: false, // Default value }) } } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..cf7142bc7cfde 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -525,6 +525,7 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + field_id_enabled: false, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..67a66be0a54af 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.field_id_enabled false datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 @@ -382,6 +383,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.field_id_enabled false (reading) If true, use Parquet field IDs for column resolution instead of column names. This enables schema evolution with renamed/reordered columns. When field IDs are unavailable, falls back to name-based matching. datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..5b21ac9d56ea4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -92,6 +92,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.field_id_enabled | false | (reading) If true, use Parquet field IDs for column resolution instead of column names. This enables schema evolution with renamed/reordered columns. When field IDs are unavailable, falls back to name-based matching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in rows | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" |