diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 57c2f5f51bbe..3bc884257dab 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1005,6 +1005,8 @@ message ParquetScanExecNode { reserved 2; PhysicalExprNode predicate = 3; + + datafusion_common.TableParquetOptions parquet_options = 4; } message CsvScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0cf893cbc534..add72e4f777e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -12129,6 +12129,9 @@ impl serde::Serialize for ParquetScanExecNode { if self.predicate.is_some() { len += 1; } + if self.parquet_options.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.ParquetScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -12136,6 +12139,9 @@ impl serde::Serialize for ParquetScanExecNode { if let Some(v) = self.predicate.as_ref() { struct_ser.serialize_field("predicate", v)?; } + if let Some(v) = self.parquet_options.as_ref() { + struct_ser.serialize_field("parquetOptions", v)?; + } struct_ser.end() } } @@ -12149,12 +12155,15 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "base_conf", "baseConf", "predicate", + "parquet_options", + "parquetOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { BaseConf, Predicate, + ParquetOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -12178,6 +12187,7 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { match value { "baseConf" | "base_conf" => Ok(GeneratedField::BaseConf), "predicate" => Ok(GeneratedField::Predicate), + "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -12199,6 +12209,7 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { { let mut base_conf__ = None; let mut predicate__ = None; + let mut parquet_options__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -12213,11 +12224,18 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { } predicate__ = map_.next_value()?; } + GeneratedField::ParquetOptions => { + if parquet_options__.is_some() { + return Err(serde::de::Error::duplicate_field("parquetOptions")); + } + parquet_options__ = map_.next_value()?; + } } } Ok(ParquetScanExecNode { base_conf: base_conf__, predicate: predicate__, + parquet_options: parquet_options__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 26efb617e067..df32c1a70d61 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1517,6 +1517,10 @@ pub struct ParquetScanExecNode { pub base_conf: ::core::option::Option, #[prost(message, optional, tag = "3")] pub predicate: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub parquet_options: ::core::option::Option< + super::datafusion_common::TableParquetOptions, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvScanExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a266d55b46df..9ded9122b7da 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -258,6 +258,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .transpose()?; let mut builder = ParquetExec::builder(base_config); + + if let Some(options) = scan.parquet_options.as_ref() { + builder = builder.with_table_parquet_options(options.try_into()?) + } + if let Some(predicate) = predicate { builder = builder.with_predicate(predicate) } @@ -1654,6 +1659,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, )?), predicate, + parquet_options: Some(exec.table_parquet_options().try_into()?), }, )), }); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7c34fe068024..50c08024464f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -730,9 +730,14 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { Operator::Eq, lit("1"), )); + + let mut options = TableParquetOptions::new(); + options.global.pushdown_filters = true; + roundtrip_test( ParquetExec::builder(scan_config) .with_predicate(predicate) + .with_table_parquet_options(options) .build_arc(), ) }