diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs
index 03ef3d66f9d7..efaee23366a1 100644
--- a/datafusion-examples/examples/advanced_parquet_index.rs
+++ b/datafusion-examples/examples/advanced_parquet_index.rs
@@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider {
             ParquetSource::default()
                 // provide the predicate so the DataSourceExec can try and prune
                 // row groups internally
-                .with_predicate(Arc::clone(&schema), predicate)
+                .with_predicate(predicate)
                 // provide the factory to create parquet reader without re-reading metadata
                 .with_parquet_file_reader_factory(Arc::new(reader_factory)),
         );
diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs
index 165d82627061..ac1e64351768 100644
--- a/datafusion-examples/examples/custom_file_format.rs
+++ b/datafusion-examples/examples/custom_file_format.rs
@@ -22,7 +22,6 @@ use arrow::{
     datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
 };
 use datafusion::physical_expr::LexRequirement;
-use datafusion::physical_expr::PhysicalExpr;
 use datafusion::{
     catalog::Session,
     common::{GetExt, Statistics},
@@ -112,11 +111,8 @@ impl FileFormat for TSVFileFormat {
         &self,
         state: &dyn Session,
         conf: FileScanConfig,
-        filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        self.csv_file_format
-            .create_physical_plan(state, conf, filters)
-            .await
+        self.csv_file_format.create_physical_plan(state, conf).await
     }
 
     async fn create_writer_physical_plan(
diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs
index 7d6ce4d86af1..c19fc2561d5f 100644
--- a/datafusion-examples/examples/parquet_index.rs
+++ b/datafusion-examples/examples/parquet_index.rs
@@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider {
         let files = self.index.get_files(predicate.clone())?;
 
         let object_store_url = ObjectStoreUrl::parse("file://")?;
-        let source =
-            Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
+        let source = Arc::new(ParquetSource::default().with_predicate(predicate));
         let mut file_scan_config_builder =
             FileScanConfigBuilder::new(object_store_url, self.schema(), source)
                 .with_projection(projection.cloned())
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs
index 12d86a471eee..b620ff62d9a6 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -54,7 +54,6 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
 use datafusion_datasource::write::ObjectWriterBuilder;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
-use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 
 use async_trait::async_trait;
@@ -174,7 +173,6 @@ impl FileFormat for ArrowFormat {
         &self,
         _state: &dyn Session,
         conf: FileScanConfig,
-        _filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let source = Arc::new(ArrowSource::default());
         let config = FileScanConfigBuilder::from(conf)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 3a098301f14e..e165707c2eb0 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -93,7 +93,6 @@ pub(crate) mod test_util {
                 .with_projection(projection)
                 .with_limit(limit)
                 .build(),
-                None,
             )
             .await?;
         Ok(exec)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 84a63faffbbd..9d1afdafadd7 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc};
 
 use crate::datasource::{
     create_ordering,
-    file_format::{
-        file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
-    },
+    file_format::{file_compression_type::FileCompressionType, FileFormat},
     physical_plan::FileSinkConfig,
 };
 use crate::execution::context::SessionState;
@@ -35,22 +33,19 @@ use datafusion_common::{config_err, DataFusionError, Result};
 use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
 use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
 use datafusion_expr::dml::InsertOp;
-use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
+use datafusion_expr::{Expr, TableProviderFilterPushDown};
 use datafusion_expr::{SortExpr, TableType};
 use datafusion_physical_plan::empty::EmptyExec;
 use datafusion_physical_plan::{ExecutionPlan, Statistics};
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
 use datafusion_common::{
-    config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
-    SchemaExt, ToDFSchema,
+    config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt,
 };
 use datafusion_execution::cache::{
     cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
 };
-use datafusion_physical_expr::{
-    create_physical_expr, LexOrdering, PhysicalSortRequirement,
-};
+use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement};
 
 use async_trait::async_trait;
 use datafusion_catalog::Session;
@@ -941,19 +936,6 @@ impl TableProvider for ListingTable {
             None => {} // no ordering required
         };
 
-        let filters = match conjunction(filters.to_vec()) {
-            Some(expr) => {
-                let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
-                let filters = create_physical_expr(
-                    &expr,
-                    &table_df_schema,
-                    state.execution_props(),
-                )?;
-                Some(filters)
-            }
-            None => None,
-        };
-
         let Some(object_store_url) =
             self.table_paths.first().map(ListingTableUrl::object_store)
         else {
@@ -978,7 +960,6 @@ impl TableProvider for ListingTable {
                 .with_output_ordering(output_ordering)
                 .with_table_partition_cols(table_partition_cols)
                 .build(),
-                filters.as_ref(),
             )
             .await
     }
@@ -1002,18 +983,6 @@ impl TableProvider for ListingTable {
                     return Ok(TableProviderFilterPushDown::Exact);
                 }
 
-                // if we can't push it down completely with only the filename-based/path-based
-                // column names, then we should check if we can do parquet predicate pushdown
-                let supports_pushdown = self.options.format.supports_filters_pushdown(
-                    &self.file_schema,
-                    &self.table_schema,
-                    &[filter],
-                )?;
-
-                if supports_pushdown == FilePushdownSupport::Supported {
-                    return Ok(TableProviderFilterPushDown::Exact);
-                }
-
                 Ok(TableProviderFilterPushDown::Inexact)
             })
             .collect()
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
index e9bb8b0db368..e4d5060e065c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -54,6 +54,7 @@ mod tests {
     use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
     use datafusion_datasource::source::DataSourceExec;
 
+    use datafusion_datasource::file::FileSource;
     use datafusion_datasource::{FileRange, PartitionedFile};
     use datafusion_datasource_parquet::source::ParquetSource;
     use datafusion_datasource_parquet::{
@@ -139,7 +140,7 @@ mod tests {
             self.round_trip(batches).await.batches
         }
 
-        fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
+        fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
             // set up predicate (this is normally done by a layer higher up)
             let predicate = self
                 .predicate
@@ -148,7 +149,7 @@ mod tests {
 
             let mut source = ParquetSource::default();
             if let Some(predicate) = predicate {
-                source = source.with_predicate(Arc::clone(&file_schema), predicate);
+                source = source.with_predicate(predicate);
             }
 
             if self.pushdown_predicate {
@@ -161,14 +162,14 @@ mod tests {
                 source = source.with_enable_page_index(true);
             }
 
-            Arc::new(source)
+            source.with_schema(Arc::clone(&file_schema))
         }
 
         fn build_parquet_exec(
             &self,
             file_schema: SchemaRef,
             file_group: FileGroup,
-            source: Arc<ParquetSource>,
+            source: Arc<dyn FileSource>,
         ) -> Arc<DataSourceExec> {
             let base_config = FileScanConfigBuilder::new(
                 ObjectStoreUrl::local_filesystem(),
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index f5753af64d93..511f378f42e2 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
 use crate::physical_plan::ExecutionPlan;
 use crate::prelude::{Expr, SessionConfig, SessionContext};
 
+use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
 use datafusion_datasource::source::DataSourceExec;
 use object_store::path::Path;
@@ -182,10 +183,11 @@ impl TestParquetFile {
             let physical_filter_expr =
                 create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
 
-            let source = Arc::new(ParquetSource::new(parquet_options).with_predicate(
-                Arc::clone(&self.schema),
-                Arc::clone(&physical_filter_expr),
-            ));
+            let source = Arc::new(
+                ParquetSource::new(parquet_options)
+                    .with_predicate(Arc::clone(&physical_filter_expr)),
+            )
+            .with_schema(Arc::clone(&self.schema));
             let config = scan_config_builder.with_source(source).build();
             let parquet_exec = DataSourceExec::from_data_source(config);
 
diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs
index 11dd961a54ee..5202d8e5f452 100644
--- a/datafusion/core/tests/fuzz_cases/pruning.rs
+++ b/datafusion/core/tests/fuzz_cases/pruning.rs
@@ -276,7 +276,7 @@ async fn execute_with_predicate(
     ctx: &SessionContext,
 ) -> Vec<String> {
     let parquet_source = if prune_stats {
-        ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone())
+        ParquetSource::default().with_predicate(predicate.clone())
     } else {
         ParquetSource::default()
     };
diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs
index bbef073345b7..a5397c5a397c 100644
--- a/datafusion/core/tests/parquet/external_access_plan.rs
+++ b/datafusion/core/tests/parquet/external_access_plan.rs
@@ -346,7 +346,7 @@ impl TestFull {
         let source = if let Some(predicate) = predicate {
             let df_schema = DFSchema::try_from(schema.clone())?;
             let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
-            Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate))
+            Arc::new(ParquetSource::default().with_predicate(predicate))
         } else {
             Arc::new(ParquetSource::default())
         };
diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs
index 8b87d59d8c46..0b76a677927f 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -28,6 +28,7 @@ use datafusion::execution::context::SessionState;
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::prelude::SessionContext;
 use datafusion_common::stats::Precision;
+use datafusion_common::DFSchema;
 use datafusion_execution::cache::cache_manager::CacheManagerConfig;
 use datafusion_execution::cache::cache_unit::{
     DefaultFileStatisticsCache, DefaultListFilesCache,
@@ -37,6 +38,10 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
 use datafusion_expr::{col, lit, Expr};
 
 use datafusion::datasource::physical_plan::FileScanConfig;
+use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::ExecutionPlan;
 use tempfile::tempdir;
 
 #[tokio::test]
@@ -47,21 +52,49 @@ async fn check_stats_precision_with_filter_pushdown() {
 
     let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
     let table = get_listing_table(&table_path, None, &opt).await;
+
     let (_, _, state) = get_cache_runtime_state();
+    let mut options = state.config().options().clone();
+    options.execution.parquet.pushdown_filters = true;
+
     // Scan without filter, stats are exact
     let exec = table.scan(&state, None, &[], None).await.unwrap();
     assert_eq!(
         exec.partition_statistics(None).unwrap().num_rows,
-        Precision::Exact(8)
+        Precision::Exact(8),
+        "Stats without filter should be exact"
     );
 
-    // Scan with filter pushdown, stats are inexact
-    let filter = Expr::gt(col("id"), lit(1));
+    // This is a filter that cannot be evaluated by the table provider scanning
+    // (it is not a partition filter). Therefore; it will be pushed down to the
+    // source operator after the appropriate optimizer pass.
+    let filter_expr = Expr::gt(col("id"), lit(1));
+    let exec_with_filter = table
+        .scan(&state, None, &[filter_expr.clone()], None)
+        .await
+        .unwrap();
+
+    let ctx = SessionContext::new();
+    let df_schema = DFSchema::try_from(table.schema()).unwrap();
+    let physical_filter = ctx.create_physical_expr(filter_expr, &df_schema).unwrap();
 
-    let exec = table.scan(&state, None, &[filter], None).await.unwrap();
+    let filtered_exec =
+        Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
+            as Arc<dyn ExecutionPlan>;
+
+    let optimized_exec = FilterPushdown::new()
+        .optimize(filtered_exec, &options)
+        .unwrap();
+
+    assert!(
+        optimized_exec.as_any().is::<DataSourceExec>(),
+        "Sanity check that the pushdown did what we expected"
+    );
+    // Scan with filter pushdown, stats are inexact
     assert_eq!(
-        exec.partition_statistics(None).unwrap().num_rows,
-        Precision::Inexact(8)
+        optimized_exec.partition_statistics(None).unwrap().num_rows,
+        Precision::Inexact(8),
+        "Stats after filter pushdown should be inexact"
     );
 }
 
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index f693485cbe01..9da879a32f6b 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
 
     let source = Arc::new(
         ParquetSource::default()
-            .with_predicate(Arc::clone(&schema), predicate)
+            .with_predicate(predicate)
             .with_enable_page_index(true),
     );
     let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)
diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs
index 160084213c7c..131a396ccb9a 100644
--- a/datafusion/core/tests/sql/path_partition.rs
+++ b/datafusion/core/tests/sql/path_partition.rs
@@ -25,8 +25,6 @@ use std::sync::Arc;
 
 use arrow::datatypes::DataType;
 use datafusion::datasource::listing::ListingTableUrl;
-use datafusion::datasource::physical_plan::ParquetSource;
-use datafusion::datasource::source::DataSourceExec;
 use datafusion::{
     datasource::{
         file_format::{csv::CsvFormat, parquet::ParquetFormat},
@@ -42,8 +40,6 @@ use datafusion_common::stats::Precision;
 use datafusion_common::test_util::batches_to_sort_string;
 use datafusion_common::ScalarValue;
 use datafusion_execution::config::SessionConfig;
-use datafusion_expr::{col, lit, Expr, Operator};
-use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -57,55 +53,6 @@ use object_store::{
 use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload};
 use url::Url;
 
-#[tokio::test]
-async fn parquet_partition_pruning_filter() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let table = create_partitioned_alltypes_parquet_table(
-        &ctx,
-        &[
-            "year=2021/month=09/day=09/file.parquet",
-            "year=2021/month=10/day=09/file.parquet",
-            "year=2021/month=10/day=28/file.parquet",
-        ],
-        &[
-            ("year", DataType::Int32),
-            ("month", DataType::Int32),
-            ("day", DataType::Int32),
-        ],
-        "mirror:///",
-        "alltypes_plain.parquet",
-    )
-    .await;
-
-    // The first three filters can be resolved using only the partition columns.
-    let filters = [
-        Expr::eq(col("year"), lit(2021)),
-        Expr::eq(col("month"), lit(10)),
-        Expr::eq(col("day"), lit(28)),
-        Expr::gt(col("id"), lit(1)),
-    ];
-    let exec = table.scan(&ctx.state(), None, &filters, None).await?;
-    let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
-    if let Some((_, parquet_config)) =
-        data_source_exec.downcast_to_file_source::<ParquetSource>()
-    {
-        let pred = parquet_config.predicate().unwrap();
-        // Only the last filter should be pushdown to TableScan
-        let expected = Arc::new(BinaryExpr::new(
-            Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
-            Operator::Gt,
-            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
-        ));
-
-        assert!(pred.as_any().is::<BinaryExpr>());
-        let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();
-
-        assert_eq!(pred, expected.as_ref());
-    }
-    Ok(())
-}
-
 #[tokio::test]
 async fn parquet_distinct_partition_col() -> Result<()> {
     let ctx = SessionContext::new();
diff --git a/datafusion/datasource-avro/src/file_format.rs b/datafusion/datasource-avro/src/file_format.rs
index 4b50fee1d326..47f8d9daca0a 100644
--- a/datafusion/datasource-avro/src/file_format.rs
+++ b/datafusion/datasource-avro/src/file_format.rs
@@ -37,7 +37,6 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
 use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
 use datafusion_datasource::source::DataSourceExec;
-use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::ExecutionPlan;
 use datafusion_session::Session;
 
@@ -150,7 +149,6 @@ impl FileFormat for AvroFormat {
         &self,
         _state: &dyn Session,
         conf: FileScanConfig,
-        _filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let config = FileScanConfigBuilder::from(conf)
             .with_source(self.file_source())
diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs
index 76f3c50a70a7..1deb2b2edd10 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -50,7 +50,6 @@ use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
 use datafusion_datasource::write::BatchSerializer;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
-use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 use datafusion_session::Session;
@@ -408,7 +407,6 @@ impl FileFormat for CsvFormat {
         &self,
         state: &dyn Session,
         conf: FileScanConfig,
-        _filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Consult configuration options for default values
         let has_header = self
diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs
index 8d0515804fc7..f6b758b5bc51 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -52,7 +52,6 @@ use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
 use datafusion_datasource::write::BatchSerializer;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
-use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 use datafusion_session::Session;
@@ -249,7 +248,6 @@ impl FileFormat for JsonFormat {
         &self,
         _state: &dyn Session,
         conf: FileScanConfig,
-        _filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let source = Arc::new(JsonSource::new());
         let conf = FileScanConfigBuilder::from(conf)
diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs
index e1d393caa8f3..bc8f84b87454 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -31,9 +31,7 @@ use datafusion_datasource::write::{
     get_writer_schema, ObjectWriterBuilder, SharedBuffer,
 };
 
-use datafusion_datasource::file_format::{
-    FileFormat, FileFormatFactory, FilePushdownSupport,
-};
+use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
 use datafusion_datasource::write::demux::DemuxedStreamReceiver;
 
 use arrow::compute::sum;
@@ -54,15 +52,12 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
-use datafusion_expr::Expr;
 use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
-use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use datafusion_physical_plan::Accumulator;
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 use datafusion_session::Session;
 
-use crate::can_expr_be_pushed_down_with_schemas;
 use crate::source::{parse_coerce_int96_string, ParquetSource};
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -413,28 +408,15 @@ impl FileFormat for ParquetFormat {
         &self,
         _state: &dyn Session,
         conf: FileScanConfig,
-        filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut predicate = None;
         let mut metadata_size_hint = None;
 
-        // If enable pruning then combine the filters to build the predicate.
-        // If disable pruning then set the predicate to None, thus readers
-        // will not prune data based on the statistics.
-        if self.enable_pruning() {
-            if let Some(pred) = filters.cloned() {
-                predicate = Some(pred);
-            }
-        }
         if let Some(metadata) = self.metadata_size_hint() {
             metadata_size_hint = Some(metadata);
         }
 
         let mut source = ParquetSource::new(self.options.clone());
 
-        if let Some(predicate) = predicate {
-            source = source.with_predicate(Arc::clone(&conf.file_schema), predicate);
-        }
         if let Some(metadata_size_hint) = metadata_size_hint {
             source = source.with_metadata_size_hint(metadata_size_hint)
         }
@@ -461,27 +443,6 @@ impl FileFormat for ParquetFormat {
         Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
     }
 
-    fn supports_filters_pushdown(
-        &self,
-        file_schema: &Schema,
-        _table_schema: &Schema,
-        filters: &[&Expr],
-    ) -> Result<FilePushdownSupport> {
-        if !self.options().global.pushdown_filters {
-            return Ok(FilePushdownSupport::NoSupport);
-        }
-
-        let all_supported = filters
-            .iter()
-            .all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema));
-
-        Ok(if all_supported {
-            FilePushdownSupport::Supported
-        } else {
-            FilePushdownSupport::NotSupportedForFilter
-        })
-    }
-
     fn file_source(&self) -> Arc<dyn FileSource> {
         Arc::new(ParquetSource::default())
     }
diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs
index 516b13792189..aed0d7f27561 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -59,7 +59,6 @@ pub use metrics::ParquetFileMetrics;
 pub use page_filter::PagePruningAccessPlanFilter;
 pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
 pub use row_filter::build_row_filter;
-pub use row_filter::can_expr_be_pushed_down_with_schemas;
 pub use row_group_filter::RowGroupAccessPlanFilter;
 use source::ParquetSource;
 pub use writer::plan_to_parquet;
@@ -223,8 +222,7 @@ impl ParquetExecBuilder {
         } = self;
         let mut parquet = ParquetSource::new(table_parquet_options);
         if let Some(predicate) = predicate.clone() {
-            parquet = parquet
-                .with_predicate(Arc::clone(&file_scan_config.file_schema), predicate);
+            parquet = parquet.with_predicate(predicate);
         }
         if let Some(metadata_size_hint) = metadata_size_hint {
             parquet = parquet.with_metadata_size_hint(metadata_size_hint)
@@ -244,7 +242,7 @@ impl ParquetExecBuilder {
             inner: DataSourceExec::new(Arc::new(base_config.clone())),
             base_config,
             predicate,
-            pruning_predicate: parquet.pruning_predicate,
+            pruning_predicate: None, // for backwards compat since `ParquetExec` is only for backwards compat anyway
             schema_adapter_factory: parquet.schema_adapter_factory,
             parquet_file_reader_factory: parquet.parquet_file_reader_factory,
             table_parquet_options: parquet.table_parquet_options,
diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs
index cfe8213f86e4..555822d71534 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -178,7 +178,7 @@ impl FileOpener for ParquetOpener {
 
             // Build predicates for this specific file
             let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
-                &predicate,
+                predicate.as_ref(),
                 &physical_file_schema,
                 &predicate_creation_errors,
             );
@@ -390,8 +390,8 @@ pub(crate) fn build_page_pruning_predicate(
     ))
 }
 
-fn build_pruning_predicates(
-    predicate: &Option<Arc<dyn PhysicalExpr>>,
+pub(crate) fn build_pruning_predicates(
+    predicate: Option<&Arc<dyn PhysicalExpr>>,
     file_schema: &SchemaRef,
     predicate_creation_errors: &Count,
 ) -> (
diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs
index d7bbe30c8943..801f41faacf2 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -367,43 +367,19 @@ fn pushdown_columns(
         .then_some(checker.required_columns.into_iter().collect()))
 }
 
-/// creates a PushdownChecker for a single use to check a given column with the given schemes. Used
-/// to check preemptively if a column name would prevent pushdowning.
-/// effectively does the inverse of [`pushdown_columns`] does, but with a single given column
-/// (instead of traversing the entire tree to determine this)
-fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bool {
-    let mut checker = PushdownChecker::new(table_schema);
-
-    // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore
-    // it here. I'm just verifying we know the return type of this so nobody accidentally changes
-    // the return type of this fn and it gets implicitly ignored here.
-    let _: Option<TreeNodeRecursion> = checker.check_single_column(column_name);
-
-    // and then return a value based on the state of the checker
-    checker.prevents_pushdown()
-}
-
 /// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent
 /// this expression from being predicate pushed down. If any of them would, this returns false.
 /// Otherwise, true.
-pub fn can_expr_be_pushed_down_with_schemas(
-    expr: &datafusion_expr::Expr,
+/// Note that the schema passed in here is *not* the physical file schema (as it is not available at that point in time);
+/// it is the schema of the table that this expression is being evaluated against minus any projected columns and partition columns.
+pub(crate) fn can_expr_be_pushed_down_with_schemas(
+    expr: &Arc<dyn PhysicalExpr>,
     file_schema: &Schema,
 ) -> bool {
-    let mut can_be_pushed = true;
-    expr.apply(|expr| match expr {
-        datafusion_expr::Expr::Column(column) => {
-            can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema);
-            Ok(if can_be_pushed {
-                TreeNodeRecursion::Jump
-            } else {
-                TreeNodeRecursion::Stop
-            })
-        }
-        _ => Ok(TreeNodeRecursion::Continue),
-    })
-    .unwrap(); // we never return an Err, so we can safely unwrap this
-    can_be_pushed
+    match pushdown_columns(expr, file_schema) {
+        Ok(Some(_)) => true,
+        Ok(None) | Err(_) => false,
+    }
 }
 
 /// Calculate the total compressed size of all `Column`'s required for
@@ -516,7 +492,7 @@ mod test {
     use super::*;
     use datafusion_common::ScalarValue;
 
-    use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond};
+    use arrow::datatypes::{Field, TimeUnit::Nanosecond};
     use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
     use datafusion_expr::{col, Expr};
     use datafusion_physical_expr::planner::logical2physical;
@@ -649,48 +625,80 @@ mod test {
 
     #[test]
     fn nested_data_structures_prevent_pushdown() {
-        let file_schema = Schema::new(vec![Field::new(
-            "list_col",
-            DataType::Struct(Fields::empty()),
-            true,
-        )]);
+        let table_schema = Arc::new(get_lists_table_schema());
 
-        let expr = col("list_col").is_not_null();
+        let expr = col("utf8_list").is_not_null();
+        let expr = logical2physical(&expr, &table_schema);
+        check_expression_can_evaluate_against_schema(&expr, &table_schema);
 
-        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
+        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
     }
 
     #[test]
-    fn projected_or_partition_columns_prevent_pushdown() {
-        let file_schema =
-            Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]);
+    fn projected_columns_prevent_pushdown() {
+        let table_schema = get_basic_table_schema();
 
-        let expr = col("nonexistent_column").is_null();
+        let expr =
+            Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;
 
-        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
+        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
     }
 
     #[test]
     fn basic_expr_doesnt_prevent_pushdown() {
-        let file_schema =
-            Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);
+        let table_schema = get_basic_table_schema();
 
         let expr = col("string_col").is_null();
+        let expr = logical2physical(&expr, &table_schema);
 
-        assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
+        assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
     }
 
     #[test]
     fn complex_expr_doesnt_prevent_pushdown() {
-        let file_schema = Schema::new(vec![
-            Field::new("string_col", DataType::Utf8, true),
-            Field::new("bigint_col", DataType::Int64, true),
-        ]);
+        let table_schema = get_basic_table_schema();
 
         let expr = col("string_col")
             .is_not_null()
             .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)))));
+        let expr = logical2physical(&expr, &table_schema);
+
+        assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+    }
+
+    fn get_basic_table_schema() -> Schema {
+        let testdata = datafusion_common::test_util::parquet_test_data();
+        let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
+            .expect("parsing schema")
+    }
+
+    fn get_lists_table_schema() -> Schema {
+        let testdata = datafusion_common::test_util::parquet_test_data();
+        let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
+            .expect("parsing schema")
+    }
 
-        assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
+    /// Sanity check that the given expression could be evaluated against the given schema without any errors.
+    /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
+    fn check_expression_can_evaluate_against_schema(
+        expr: &Arc<dyn PhysicalExpr>,
+        table_schema: &Arc<Schema>,
+    ) -> bool {
+        let batch = RecordBatch::new_empty(Arc::clone(table_schema));
+        expr.evaluate(&batch).is_ok()
     }
 }
diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs
index e15f5243cd27..13518562ca61 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -21,26 +21,30 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-use crate::opener::build_page_pruning_predicate;
-use crate::opener::build_pruning_predicate;
+use crate::opener::build_pruning_predicates;
 use crate::opener::ParquetOpener;
-use crate::page_filter::PagePruningAccessPlanFilter;
+use crate::row_filter::can_expr_be_pushed_down_with_schemas;
 use crate::DefaultParquetFileReaderFactory;
 use crate::ParquetFileReaderFactory;
+use datafusion_common::config::ConfigOptions;
 use datafusion_datasource::file_stream::FileOpener;
 use datafusion_datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
 
-use arrow::datatypes::{Schema, SchemaRef, TimeUnit};
+use arrow::datatypes::{SchemaRef, TimeUnit};
 use datafusion_common::config::TableParquetOptions;
 use datafusion_common::{DataFusionError, Statistics};
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_physical_expr::conjunction;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_optimizer::pruning::PruningPredicate;
-use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
+use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
+use datafusion_physical_plan::filter_pushdown::PredicateSupport;
+use datafusion_physical_plan::filter_pushdown::PredicateSupports;
+use datafusion_physical_plan::metrics::Count;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
 use itertools::Itertools;
@@ -92,7 +96,7 @@ use object_store::ObjectStore;
 /// # let predicate = lit(true);
 /// let source = Arc::new(
 ///     ParquetSource::default()
-///     .with_predicate(Arc::clone(&file_schema), predicate)
+///     .with_predicate(predicate)
 /// );
 /// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
 /// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
@@ -259,12 +263,12 @@ pub struct ParquetSource {
     pub(crate) table_parquet_options: TableParquetOptions,
     /// Optional metrics
     pub(crate) metrics: ExecutionPlanMetricsSet,
+    /// The schema of the file.
+    /// In particular, this is the schema of the table without partition columns,
+    /// *not* the physical schema of the file.
+    pub(crate) file_schema: Option<SchemaRef>,
     /// Optional predicate for row filtering during parquet scan
     pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
-    /// Optional predicate for pruning row groups (derived from `predicate`)
-    pub(crate) pruning_predicate: Option<Arc<PruningPredicate>>,
-    /// Optional predicate for pruning pages (derived from `predicate`)
-    pub(crate) page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
     /// Optional user defined parquet file reader factory
     pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
     /// Optional user defined schema adapter
@@ -303,26 +307,12 @@ impl ParquetSource {
         self
     }
 
-    /// Set predicate information, also sets pruning_predicate and page_pruning_predicate attributes
-    pub fn with_predicate(
-        &self,
-        file_schema: Arc<Schema>,
-        predicate: Arc<dyn PhysicalExpr>,
-    ) -> Self {
+    /// Set predicate information
+    pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
         let mut conf = self.clone();
-
         let metrics = ExecutionPlanMetricsSet::new();
-        let predicate_creation_errors =
-            MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
-
         conf = conf.with_metrics(metrics);
         conf.predicate = Some(Arc::clone(&predicate));
-
-        conf.page_pruning_predicate =
-            Some(build_page_pruning_predicate(&predicate, &file_schema));
-        conf.pruning_predicate =
-            build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors);
-
         conf
     }
 
@@ -515,8 +505,11 @@ impl FileSource for ParquetSource {
         Arc::new(conf)
     }
 
-    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
-        Arc::new(Self { ..self.clone() })
+    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            file_schema: Some(schema),
+            ..self.clone()
+        })
     }
 
     fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
@@ -561,25 +554,41 @@ impl FileSource for ParquetSource {
                     .predicate()
                     .map(|p| format!(", predicate={p}"))
                     .unwrap_or_default();
-                let pruning_predicate_string = self
-                    .pruning_predicate
-                    .as_ref()
-                    .map(|pre| {
-                        let mut guarantees = pre
+
+                write!(f, "{}", predicate_string)?;
+
+                // Try to build a the pruning predicates.
+                // These are only generated here because it's useful to have *some*
+                // idea of what pushdown is happening when viewing plans.
+                // However it is important to note that these predicates are *not*
+                // necessarily the predicates that are actually evaluated:
+                // the actual predicates are built in reference to the physical schema of
+                // each file, which we do not have at this point and hence cannot use.
+                // Instead we use the logical schema of the file (the table schema without partition columns).
+                if let (Some(file_schema), Some(predicate)) =
+                    (&self.file_schema, &self.predicate)
+                {
+                    let predicate_creation_errors = Count::new();
+                    if let (Some(pruning_predicate), _) = build_pruning_predicates(
+                        Some(predicate),
+                        file_schema,
+                        &predicate_creation_errors,
+                    ) {
+                        let mut guarantees = pruning_predicate
                             .literal_guarantees()
                             .iter()
                             .map(|item| format!("{}", item))
                             .collect_vec();
                         guarantees.sort();
-                        format!(
+                        writeln!(
+                            f,
                             ", pruning_predicate={}, required_guarantees=[{}]",
-                            pre.predicate_expr(),
+                            pruning_predicate.predicate_expr(),
                             guarantees.join(", ")
-                        )
-                    })
-                    .unwrap_or_default();
-
-                write!(f, "{}{}", predicate_string, pruning_predicate_string)
+                        )?;
+                    }
+                };
+                Ok(())
             }
             DisplayFormatType::TreeRender => {
                 if let Some(predicate) = self.predicate() {
@@ -589,4 +598,62 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(
+        &self,
+        filters: Vec<Arc<dyn PhysicalExpr>>,
+        config: &ConfigOptions,
+    ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
+        let Some(file_schema) = self.file_schema.clone() else {
+            return Ok(FilterPushdownPropagation::unsupported(filters));
+        };
+        // Can we push down the filters themselves into the scan or only use stats pruning?
+        let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
+        let table_pushdown_enabled = self.pushdown_filters();
+        let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
+
+        let mut source = self.clone();
+        let mut allowed_filters = vec![];
+        let mut remaining_filters = vec![];
+        for filter in &filters {
+            if can_expr_be_pushed_down_with_schemas(filter, &file_schema) {
+                // This filter can be pushed down
+                allowed_filters.push(Arc::clone(filter));
+            } else {
+                // This filter cannot be pushed down
+                remaining_filters.push(Arc::clone(filter));
+            }
+        }
+        if allowed_filters.is_empty() {
+            // No filters can be pushed down, so we can just return the remaining filters
+            // and avoid replacing the source in the physical plan.
+            return Ok(FilterPushdownPropagation::unsupported(filters));
+        }
+        let predicate = match source.predicate {
+            Some(predicate) => conjunction(
+                std::iter::once(predicate).chain(allowed_filters.iter().cloned()),
+            ),
+            None => conjunction(allowed_filters.iter().cloned()),
+        };
+        source.predicate = Some(predicate);
+        let source = Arc::new(source);
+        let filters = PredicateSupports::new(
+            allowed_filters
+                .into_iter()
+                .map(|f| {
+                    if pushdown_filters {
+                        PredicateSupport::Supported(f)
+                    } else {
+                        PredicateSupport::Unsupported(f)
+                    }
+                })
+                .chain(
+                    remaining_filters
+                        .into_iter()
+                        .map(PredicateSupport::Unsupported),
+                )
+                .collect(),
+        );
+        Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
+    }
 }
diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs
index 0e0b7b12e16a..b2caf5277a25 100644
--- a/datafusion/datasource/src/file_format.rs
+++ b/datafusion/datasource/src/file_format.rs
@@ -28,11 +28,10 @@ use crate::file_compression_type::FileCompressionType;
 use crate::file_scan_config::FileScanConfig;
 use crate::file_sink_config::FileSinkConfig;
 
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::SchemaRef;
 use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
-use datafusion_expr::Expr;
-use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
+use datafusion_physical_expr::LexRequirement;
 use datafusion_physical_plan::ExecutionPlan;
 use datafusion_session::Session;
 
@@ -94,7 +93,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         &self,
         state: &dyn Session,
         conf: FileScanConfig,
-        filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
     /// Take a list of files and the configuration to convert it to the
@@ -109,37 +107,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         not_impl_err!("Writer not implemented for this format")
     }
 
-    /// Check if the specified file format has support for pushing down the provided filters within
-    /// the given schemas. Added initially to support the Parquet file format's ability to do this.
-    fn supports_filters_pushdown(
-        &self,
-        _file_schema: &Schema,
-        _table_schema: &Schema,
-        _filters: &[&Expr],
-    ) -> Result<FilePushdownSupport> {
-        Ok(FilePushdownSupport::NoSupport)
-    }
-
     /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
     fn file_source(&self) -> Arc<dyn FileSource>;
 }
 
-/// An enum to distinguish between different states when determining if certain filters can be
-/// pushed down to file scanning
-#[derive(Debug, PartialEq)]
-pub enum FilePushdownSupport {
-    /// The file format/system being asked does not support any sort of pushdown. This should be
-    /// used even if the file format theoretically supports some sort of pushdown, but it's not
-    /// enabled or implemented yet.
-    NoSupport,
-    /// The file format/system being asked *does* support pushdown, but it can't make it work for
-    /// the provided filter/expression
-    NotSupportedForFilter,
-    /// The file format/system being asked *does* support pushdown and *can* make it work for the
-    /// provided filter/expression
-    Supported,
-}
-
 /// Factory for creating [`FileFormat`] instances based on session and command level options
 ///
 /// Users can provide their own `FileFormatFactory` to support arbitrary file formats
diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs
index 147fcc59d5ce..32fe368a4db4 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -90,6 +90,7 @@ use log::{debug, warn};
 /// #  Field::new("c4", DataType::Int32, false),
 /// # ]));
 /// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
+/// #[derive(Clone)]
 /// # struct ParquetSource {
 /// #    projected_statistics: Option<Statistics>
 /// # };
@@ -97,7 +98,7 @@ use log::{debug, warn};
 /// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
 /// #  fn as_any(&self) -> &dyn Any { self  }
 /// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
-/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
+/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
 /// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
 /// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
 /// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
@@ -405,7 +406,9 @@ impl FileScanConfigBuilder {
         let statistics =
             statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
 
-        let file_source = file_source.with_statistics(statistics.clone());
+        let file_source = file_source
+            .with_statistics(statistics.clone())
+            .with_schema(Arc::clone(&file_schema));
         let file_compression_type =
             file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
         let new_lines_in_values = new_lines_in_values.unwrap_or(false);
@@ -461,7 +464,6 @@ impl DataSource for FileScanConfig {
         let source = self
             .file_source
             .with_batch_size(batch_size)
-            .with_schema(Arc::clone(&self.file_schema))
             .with_projection(self);
 
         let opener = source.create_file_opener(object_store, self, partition);
@@ -637,7 +639,9 @@ impl FileScanConfig {
         file_source: Arc<dyn FileSource>,
     ) -> Self {
         let statistics = Statistics::new_unknown(&file_schema);
-        let file_source = file_source.with_statistics(statistics.clone());
+        let file_source = file_source
+            .with_statistics(statistics.clone())
+            .with_schema(Arc::clone(&file_schema));
         Self {
             object_store_url,
             file_schema,
diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs
index 78d3e2ad8873..432ac35ebc23 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -95,6 +95,10 @@ impl PhysicalOptimizer {
             // as that rule may inject other operations in between the different AggregateExecs.
             // Applying the rule early means only directly-connected AggregateExecs must be examined.
             Arc::new(LimitedDistinctAggregation::new()),
+            // The FilterPushdown rule tries to push down filters as far as it can.
+            // For example, it will push down filtering from a `FilterExec` to
+            // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
+            Arc::new(FilterPushdown::new()),
             // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
             // requirements. Please make sure that the whole plan tree is determined before this rule.
             // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
@@ -122,10 +126,6 @@ impl PhysicalOptimizer {
             // into an `order by max(x) limit y`. In this case it will copy the limit value down
             // to the aggregation, allowing it to use only y number of accumulators.
             Arc::new(TopKAggregation::new()),
-            // The FilterPushdown rule tries to push down filters as far as it can.
-            // For example, it will push down filtering from a `FilterExec` to
-            // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
-            Arc::new(FilterPushdown::new()),
             // The LimitPushdown rule tries to push limits down as far as possible,
             // replacing operators with fetching variants, or adding limits
             // past operators that support limit pushdown.
diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs
index 928bc1b389c7..b99bef38f324 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -28,7 +28,7 @@ use super::{
 use crate::common::can_project;
 use crate::execution_plan::CardinalityEffect;
 use crate::filter_pushdown::{
-    ChildPushdownResult, FilterDescription, FilterPushdownPropagation, PredicateSupport,
+    ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
 };
 use crate::projection::{
     make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
@@ -64,6 +64,7 @@ use datafusion_physical_expr::{
 
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use futures::stream::{Stream, StreamExt};
+use itertools::Itertools;
 use log::trace;
 
 const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20;
@@ -459,7 +460,10 @@ impl ExecutionPlan for FilterExec {
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
-        let self_filter = Arc::clone(&self.predicate);
+        let self_filter = split_conjunction(&self.predicate)
+            .into_iter()
+            .cloned()
+            .collect_vec();
 
         let parent_filters = if let Some(projection_indices) = self.projection.as_ref() {
             // We need to invert the projection on any referenced columns in the filter
@@ -503,37 +507,25 @@ impl ExecutionPlan for FilterExec {
 
         Ok(FilterDescription::new_with_child_count(1)
             .all_parent_filters_supported(parent_filters)
-            .with_self_filter(self_filter))
+            .with_self_filters_for_children(vec![self_filter]))
     }
 
     fn handle_child_pushdown_result(
         &self,
-        mut child_pushdown_result: ChildPushdownResult,
+        child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+        // We absorb any parent filters that were not handled by our children
+        let mut unhandled_filters =
+            child_pushdown_result.parent_filters.collect_unsupported();
         assert_eq!(
             child_pushdown_result.self_filters.len(),
             1,
             "FilterExec should only have one child"
         );
-        assert_eq!(
-            child_pushdown_result.self_filters[0].len(),
-            1,
-            "FilterExec produces only one filter"
-        );
-
-        // We absorb any parent filters that were not handled by our children
-        let mut unhandled_filters =
-            child_pushdown_result.parent_filters.collect_unsupported();
-
-        let self_filters = child_pushdown_result
-            .self_filters
-            .swap_remove(0)
-            .into_inner()
-            .swap_remove(0);
-        if let PredicateSupport::Unsupported(expr) = self_filters {
-            unhandled_filters.push(expr);
-        }
+        let unsupported_self_filters =
+            child_pushdown_result.self_filters[0].collect_unsupported();
+        unhandled_filters.extend(unsupported_self_filters);
 
         // If we have unhandled filters, we need to create a new FilterExec
         let filter_input = Arc::clone(self.input());
diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs
index 0003fc9d7277..4e84fe36f98f 100644
--- a/datafusion/physical-plan/src/filter_pushdown.rs
+++ b/datafusion/physical-plan/src/filter_pushdown.rs
@@ -42,7 +42,7 @@ impl PredicateSupports {
         Self(pushdowns)
     }
 
-    /// Create a new [`PredicateSupports`] with all filters as supported.
+    /// Create a new [`PredicateSupport`] with all filters as supported.
     pub fn all_supported(filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
         let pushdowns = filters
             .into_iter()
@@ -51,7 +51,7 @@ impl PredicateSupports {
         Self::new(pushdowns)
     }
 
-    /// Create a new [`PredicateSupports`] with all filters as unsupported.
+    /// Create a new [`PredicateSupport`] with all filters as unsupported.
     pub fn all_unsupported(filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
         let pushdowns = filters
             .into_iter()
@@ -60,8 +60,9 @@ impl PredicateSupports {
         Self::new(pushdowns)
     }
 
-    /// Transform all filters to supported, returning a new FilterPushdowns.
-    /// This does not modify the original [`PredicateSupports`].
+    /// Transform all filters to supported, returning a new [`PredicateSupports`]
+    /// with all filters as [`PredicateSupport::Supported`].
+    /// This does not modify the original [`PredicateSupport`].
     pub fn make_supported(self) -> Self {
         let pushdowns = self
             .0
@@ -74,8 +75,23 @@ impl PredicateSupports {
         Self::new(pushdowns)
     }
 
+    /// Transform all filters to unsupported, returning a new [`PredicateSupports`]
+    /// with all filters as [`PredicateSupport::Supported`].
+    /// This does not modify the original [`PredicateSupport`].
+    pub fn make_unsupported(self) -> Self {
+        let pushdowns = self
+            .0
+            .into_iter()
+            .map(|f| match f {
+                PredicateSupport::Supported(expr) => PredicateSupport::Unsupported(expr),
+                u @ PredicateSupport::Unsupported(_) => u,
+            })
+            .collect();
+        Self::new(pushdowns)
+    }
+
     /// Collect unsupported filters into a Vec, without removing them from the original
-    /// [`PredicateSupports`].
+    /// [`PredicateSupport`].
     pub fn collect_unsupported(&self) -> Vec<Arc<dyn PhysicalExpr>> {
         self.0
             .iter()
@@ -187,6 +203,20 @@ impl<T> FilterPushdownPropagation<T> {
             updated_node: None,
         }
     }
+
+    /// Create a new [`FilterPushdownPropagation`] with the specified filter support.
+    pub fn with_filters(filters: PredicateSupports) -> Self {
+        Self {
+            filters,
+            updated_node: None,
+        }
+    }
+
+    /// Bind an updated node to the [`FilterPushdownPropagation`].
+    pub fn with_updated_node(mut self, updated_node: T) -> Self {
+        self.updated_node = Some(updated_node);
+        self
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -194,7 +224,7 @@ struct ChildFilterDescription {
     /// Description of which parent filters can be pushed down into this node.
     /// Since we need to transmit filter pushdown results back to this node's parent
     /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
-    /// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters.
+    /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters.
     parent_filters: PredicateSupports,
     /// Description of which filters this node is pushing down to its children.
     /// Since this is not transmitted back to the parents we can have variable sized inner arrays
@@ -297,4 +327,14 @@ impl FilterDescription {
         }
         self
     }
+
+    pub fn with_self_filters_for_children(
+        mut self,
+        filters: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    ) -> Self {
+        for (child, filters) in self.child_filter_descriptions.iter_mut().zip(filters) {
+            child.self_filters = filters;
+        }
+        self
+    }
 }
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 4429c8fdd8df..9b636d15103d 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -728,7 +728,7 @@ impl protobuf::PhysicalPlanNode {
             let mut source = ParquetSource::new(options);
 
             if let Some(predicate) = predicate {
-                source = source.with_predicate(Arc::clone(&schema), predicate);
+                source = source.with_predicate(predicate);
             }
             let base_config = parse_protobuf_file_scan_config(
                 scan.base_conf.as_ref().unwrap(),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index c29814676dcd..ad4c695b9ef1 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -756,9 +756,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
     let mut options = TableParquetOptions::new();
     options.global.pushdown_filters = true;
 
-    let file_source = Arc::new(
-        ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate),
-    );
+    let file_source = Arc::new(ParquetSource::new(options).with_predicate(predicate));
 
     let scan_config = FileScanConfigBuilder::new(
         ObjectStoreUrl::local_filesystem(),
@@ -817,10 +815,8 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
         inner: Arc::new(Column::new("col", 1)),
     });
 
-    let file_source = Arc::new(
-        ParquetSource::default()
-            .with_predicate(Arc::clone(&file_schema), custom_predicate_expr),
-    );
+    let file_source =
+        Arc::new(ParquetSource::default().with_predicate(custom_predicate_expr));
 
     let scan_config = FileScanConfigBuilder::new(
         ObjectStoreUrl::local_filesystem(),
diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt
index ba2596551f1d..b78a16ca72f0 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -229,6 +229,7 @@ physical_plan after OutputRequirements
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
 physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -237,7 +238,6 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -304,6 +304,7 @@ physical_plan after OutputRequirements
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
 physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -314,7 +315,6 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -345,6 +345,7 @@ physical_plan after OutputRequirements
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
 physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -355,7 +356,6 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10
 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 252704f260b8..01e0ad2fee12 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -54,7 +54,6 @@ LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/';
 statement ok
 set datafusion.execution.parquet.pushdown_filters = true;
 
-## Create table without pushdown
 statement ok
 CREATE EXTERNAL TABLE t_pushdown(a varchar, b int, c float) STORED AS PARQUET
 LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/';
@@ -81,7 +80,9 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
@@ -127,7 +128,9 @@ EXPLAIN select a from t_pushdown where b > 2 AND a IS NOT NULL order by a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL
+04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
@@ -144,7 +147,9 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b;
 ----
 logical_plan
 01)Sort: t_pushdown.b ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8("bar")]
+02)--Projection: t_pushdown.b
+03)----Filter: t_pushdown.a = Utf8("bar")
+04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.a = Utf8("bar")]
 physical_plan
 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
@@ -208,7 +213,7 @@ physical_plan
 01)CoalesceBatchesExec: target_batch_size=8192
 02)--FilterExec: val@0 != part@1
 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
-04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1
+04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet
 
 # If we reference only a partition column it gets evaluted during the listing phase
 query TT
@@ -221,5 +226,23 @@ physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion
 query TT
 EXPLAIN select * from t_pushdown where val != 'c';
 ----
-logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")]
+logical_plan
+01)Filter: t_pushdown.val != Utf8("c")
+02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != Utf8("c")]
 physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]
+
+# If we have a mix of filters:
+# - The partition filters get evaluated during planning
+# - The mixed filters end up in a FilterExec
+# - The file filters get pushed down into the scan
+query TT
+EXPLAIN select * from t_pushdown where val != 'd' AND val != 'c' AND part = 'a' AND part != val;
+----
+logical_plan
+01)Filter: t_pushdown.val != Utf8("d") AND t_pushdown.val != Utf8("c") AND t_pushdown.val != t_pushdown.part
+02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val != Utf8("d"), t_pushdown.val != Utf8("c"), t_pushdown.val != t_pushdown.part]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: val@0 != part@1
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)]
diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt
index 67965146e76b..ed948dd11439 100644
--- a/datafusion/sqllogictest/test_files/push_down_filter.slt
+++ b/datafusion/sqllogictest/test_files/push_down_filter.slt
@@ -18,7 +18,7 @@
 # Test push down filter
 
 statement ok
-set datafusion.explain.logical_plan_only = true;
+set datafusion.explain.physical_plan_only = true;
 
 statement ok
 CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
@@ -35,12 +35,14 @@ select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 =
 query TT
 explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2;
 ----
-logical_plan
-01)Projection: __unnest_placeholder(v.column2,depth=1) AS uc2
-02)--Unnest: lists[__unnest_placeholder(v.column2)|depth=1] structs[]
-03)----Projection: v.column2 AS __unnest_placeholder(v.column2), v.column1
-04)------Filter: v.column1 = Int64(2)
-05)--------TableScan: v projection=[column1, column2]
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--UnnestExec
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------FilterExec: column1@0 = 2
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query I
 select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
@@ -52,13 +54,15 @@ select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
 query TT
 explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
 ----
-logical_plan
-01)Projection: __unnest_placeholder(v.column2,depth=1) AS uc2
-02)--Filter: __unnest_placeholder(v.column2,depth=1) > Int64(3)
-03)----Projection: __unnest_placeholder(v.column2,depth=1)
-04)------Unnest: lists[__unnest_placeholder(v.column2)|depth=1] structs[]
-05)--------Projection: v.column2 AS __unnest_placeholder(v.column2), v.column1
-06)----------TableScan: v projection=[column1, column2]
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as __unnest_placeholder(v.column2,depth=1)]
+06)----------UnnestExec
+07)------------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
+08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II
 select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
@@ -70,13 +74,16 @@ select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where
 query TT
 explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
 ----
-logical_plan
-01)Projection: __unnest_placeholder(v.column2,depth=1) AS uc2, v.column1
-02)--Filter: __unnest_placeholder(v.column2,depth=1) > Int64(3)
-03)----Unnest: lists[__unnest_placeholder(v.column2)|depth=1] structs[]
-04)------Projection: v.column2 AS __unnest_placeholder(v.column2), v.column1
-05)--------Filter: v.column1 = Int64(2)
-06)----------TableScan: v projection=[column1, column2]
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+04)------UnnestExec
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+06)----------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
+07)------------CoalesceBatchesExec: target_batch_size=8192
+08)--------------FilterExec: column1@0 = 2
+09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II
 select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
@@ -89,12 +96,14 @@ select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where
 query TT
 explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
 ----
-logical_plan
-01)Projection: __unnest_placeholder(v.column2,depth=1) AS uc2, v.column1
-02)--Filter: __unnest_placeholder(v.column2,depth=1) > Int64(3) OR v.column1 = Int64(2)
-03)----Unnest: lists[__unnest_placeholder(v.column2)|depth=1] structs[]
-04)------Projection: v.column2 AS __unnest_placeholder(v.column2), v.column1
-05)--------TableScan: v projection=[column1, column2]
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------UnnestExec
+06)----------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 statement ok
 drop table v;
@@ -111,12 +120,14 @@ select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
 query TT
 explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
 ----
-logical_plan
-01)Projection: d.column1, __unnest_placeholder(d.column2,depth=1) AS o
-02)--Filter: get_field(__unnest_placeholder(d.column2,depth=1), Utf8("a")) = Int64(1)
-03)----Unnest: lists[__unnest_placeholder(d.column2)|depth=1] structs[]
-04)------Projection: d.column1, d.column2 AS __unnest_placeholder(d.column2)
-05)--------TableScan: d projection=[column1, column2]
+physical_plan
+01)ProjectionExec: expr=[column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as o]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----FilterExec: get_field(__unnest_placeholder(d.column2,depth=1)@1, a) = 1
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------UnnestExec
+06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as __unnest_placeholder(d.column2)]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 
 
@@ -179,9 +190,9 @@ LOCATION 'test_files/scratch/parquet/test_filter_with_limit/';
 query TT
 explain select * from test_filter_with_limit where value = 2 limit 1;
 ----
-logical_plan
-01)Limit: skip=0, fetch=1
-02)--TableScan: test_filter_with_limit projection=[part_key, value], full_filters=[test_filter_with_limit.value = Int32(2)], fetch=1
+physical_plan
+01)CoalescePartitionsExec: fetch=1
+02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)]
 
 query II
 select * from test_filter_with_limit where value = 2 limit 1;
@@ -218,43 +229,43 @@ LOCATION 'test_files/scratch/push_down_filter/t.parquet';
 query TT
 explain select a from t where a = '100';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[t.a = Int32(100)]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
 
 # The predicate should not have a column cast  when the value is a valid i32
 query TT
 explain select a from t where a != '100';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[t.a != Int32(100)]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)]
 
 # The predicate should still have the column cast when the value is a NOT valid i32
 query TT
 explain select a from t where a = '99999999999';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("99999999999")]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
 
 # The predicate should still have the column cast when the value is a NOT valid i32
 query TT
 explain select a from t where a = '99.99';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("99.99")]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
 
 # The predicate should still have the column cast when the value is a NOT valid i32
 query TT
 explain select a from t where a = '';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("")]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 
 
 # The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information.
 query TT
 explain select a from t where cast(a as string) = '100';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[t.a = Int32(100)]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)]
 
 # The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost).
 query TT
 explain select a from t where CAST(a AS string) = '0123';
 ----
-logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8("0123")]
+physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 0123
 
 
 statement ok
diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md
index 1622ba41f93b..875caf15172c 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -46,6 +46,16 @@ schema. To upgrade structs which implement `PhysicalExpr` you need to implement
 the `return_field` function. There are numerous examples in the `physical-expr`
 crate.
 
+### `FileFormat::supports_filters_pushdown` replaced with `FileSource::try_pushdown_filters`
+
+To support more general filter pushdown, the `FileFormat::supports_filters_pushdown` was replaced with
+`FileSource::try_pushdown_filters`.
+If you implemented a custom `FileFormat` that uses a custom `FileSource` you will need to implement
+`FileSource::try_pushdown_filters`.
+See `ParquetSource::try_pushdown_filters` for an example of how to implement this.
+
+`FileFormat::supports_filters_pushdown` has been removed.
+
 ## DataFusion `47.0.0`
 
 This section calls out some of the major changes in the `47.0.0` release of DataFusion.
@@ -307,7 +317,7 @@ let mut file_source = ParquetSource::new(parquet_options)
 // Add filter
 if let Some(predicate) = logical_filter {
     if config.enable_parquet_pushdown {
-        file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
+        file_source = file_source.with_predicate(predicate);
     }
 };