Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions benchmarks/results.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn json_shredding() -> Result<()> {
let plan = format!("{}", arrow::util::pretty::pretty_format_batches(&batches)?);
println!("{plan}");
assert_contains!(&plan, "row_groups_pruned_statistics=2 total → 1 matched");
assert_contains!(&plan, "pushdown_rows_pruned=1");
assert_contains!(&plan, "pushdown_rows_pruned=0");

Ok(())
}
Expand Down
90 changes: 90 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,96 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) Minimum bytes/sec throughput for adaptive filter pushdown.
/// Filters that achieve at least this throughput (bytes_saved / eval_time)
/// are promoted to row filters.
/// f64::INFINITY = no filters promoted (feature disabled).
/// 0.0 = all filters pushed as row filters (no adaptive logic).
/// Default: 104857600.0 (100 MB/s) — empirically tuned across
/// TPC-H, TPC-DS, and ClickBench benchmarks on an m4 MacBook Pro.
/// The optimal value for this setting likely depends on the relative
/// cost of CPU vs. IO in your environment, and to some extent the shape
/// of your query.
///
/// **Interaction with `pushdown_filters` and `reorder_filters`:**
/// This option only takes effect when `pushdown_filters = true`.
/// When pushdown is disabled, all filters run post-scan and this
/// threshold is ignored. During the statistics collection phase
/// (see `filter_statistics_collection_min_rows`), all filters
/// temporarily run post-scan to gather baseline metrics; once
/// collection completes, filters exceeding this throughput threshold
/// are promoted to row filters while the rest remain post-scan.
/// When `reorder_filters = true`, promoted filters are further
/// sorted by measured effectiveness (most selective first), falling
/// back to the default I/O-cost heuristic for filters without data.
pub filter_pushdown_min_bytes_per_sec: f64, default = 104_857_600.0

/// (reading) Correlation ratio threshold for grouping filters.
/// The ratio is P(A ∧ B) / (P(A) * P(B)):
/// 1.0 = independent (keep separate for late materialization benefit)
/// 1.5 = filters co-pass 50% more often than chance (default threshold)
/// 2.0 = filters co-pass twice as often as chance (conservative)
/// Higher values = less grouping = more late materialization, more overhead.
/// Lower values = more grouping = less overhead, less late materialization.
/// Set to f64::MAX to disable grouping entirely.
///
/// **Interaction with `pushdown_filters` and `reorder_filters`:**
/// Grouping only applies when `pushdown_filters = true` and the
/// statistics collection phase has completed. Correlated filters
/// are merged into a single compound `ArrowPredicate` so they
/// decode shared columns only once. When `reorder_filters = true`,
/// the compound predicates are ordered among the other row-filter
/// predicates by measured effectiveness.
pub filter_correlation_threshold: f64, default = 1.5

/// (reading) Minimum rows of post-scan evaluation before statistics-based
/// optimization activates. During collection, all filters are evaluated
/// as post-scan to gather accurate marginal and joint selectivity statistics.
/// Used for BOTH individual filter effectiveness decisions AND correlation-
/// based grouping. Larger values = more accurate estimates, longer collection.
/// Set to 0 to disable the collection phase entirely.
///
/// **Interaction with `pushdown_filters` and `reorder_filters`:**
/// During the collection phase, `pushdown_filters` is effectively
/// overridden: all filters run post-scan regardless of its value so
/// that unbiased selectivity statistics can be gathered. After
/// collection, `pushdown_filters` and `reorder_filters` resume
/// their normal roles — gating which filters are promoted and how
/// they are ordered, respectively.
/// If `pushdown_filters` is disabled, this option has no effect since all filters
/// run post-scan regardless of the collection phase.
pub filter_statistics_collection_min_rows: u64, default = 50_000

/// (reading) Fraction of total dataset rows to use for the statistics
/// collection phase. When > 0 and the dataset row count is known, the
/// effective collection threshold is max(min_rows, fraction * total_rows).
/// 0.0 = disabled, use filter_statistics_collection_min_rows only.
/// 0.05 (default) = collect stats on at least 5% of the dataset.
/// Must be in [0.0, 1.0].
///
/// This option ensures the collection window is proportional to the dataset size,
/// which matters for very large tables where a fixed row count
/// would be an insignificant sample.
///
/// **Interaction with `pushdown_filters`:**
/// If `pushdown_filters` is disabled, this option has no effect since all filters
/// run post-scan regardless of the collection phase.
pub filter_statistics_collection_fraction: f64, default = 0.05

/// (reading) Maximum rows for the statistics collection phase,
/// regardless of dataset size. The effective collection threshold
/// becomes min(max_rows, max(min_rows, fraction * total_rows)).
/// 0 = no cap. Default: 750_000.
///
/// This caps the collection window so that very large datasets
/// do not spend an excessive number of rows in post-scan mode.
/// It also bounds the re-evaluation interval for cached decisions.
///
/// **Interaction with `pushdown_filters`:**
/// If `pushdown_filters` is disabled, this option has no effect since all filters
/// run post-scan regardless of the collection phase.
pub filter_statistics_collection_max_rows: u64, default = 750_000

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
23 changes: 23 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
filter_correlation_threshold: _, // not used for writer props
filter_statistics_collection_min_rows: _, // not used for writer props
filter_statistics_collection_fraction: _, // not used for writer props
filter_statistics_collection_max_rows: _, // not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -460,6 +465,14 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
filter_correlation_threshold: defaults.filter_correlation_threshold,
filter_statistics_collection_fraction: defaults
.filter_statistics_collection_fraction,
filter_statistics_collection_max_rows: defaults
.filter_statistics_collection_max_rows,
filter_statistics_collection_min_rows: defaults
.filter_statistics_collection_min_rows,
}
}

Expand Down Expand Up @@ -574,6 +587,16 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
filter_pushdown_min_bytes_per_sec: global_options_defaults
.filter_pushdown_min_bytes_per_sec,
filter_correlation_threshold: global_options_defaults
.filter_correlation_threshold,
filter_statistics_collection_fraction: global_options_defaults
.filter_statistics_collection_fraction,
filter_statistics_collection_max_rows: global_options_defaults
.filter_statistics_collection_max_rows,
filter_statistics_collection_min_rows: global_options_defaults
.filter_statistics_collection_min_rows,
},
column_specific_options,
key_value_metadata,
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
let data_source_exec_row = formatted
.lines()
.find(|line| line.contains("DataSourceExec:"))
.unwrap();
assert!(
data_source_exec_row.contains("predicate=id@0 = 1"),
"{formatted}"
);

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ mod tests {
if self.pushdown_predicate {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
.with_reorder_filters(true)
.with_filter_pushdown_min_bytes_per_sec(0.0);
} else {
source = source.with_pushdown_filters(false);
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl ParquetScanOptions {
config.execution.parquet.pushdown_filters = self.pushdown_filters;
config.execution.parquet.reorder_filters = self.reorder_filters;
config.execution.parquet.enable_page_index = self.enable_page_index;
// Disable adaptive filter selection for tests that expect deterministic pushdown
config.execution.parquet.filter_pushdown_min_bytes_per_sec = 0.0;
config.into()
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ async fn predicate_cache_default() -> datafusion_common::Result<()> {
async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.filter_pushdown_min_bytes_per_sec = 0.0;
let ctx = SessionContext::new_with_config(config);
// The cache is on by default, and used when filter pushdown is enabled
PredicateCacheTest {
Expand All @@ -647,6 +648,7 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.filter_pushdown_min_bytes_per_sec = 0.0;
// force to get multiple batches to trigger repeated metric compound bug
config.options_mut().execution.batch_size = 1;
let ctx = SessionContext::new_with_config(config);
Expand All @@ -664,6 +666,7 @@ async fn predicate_cache_pushdown_default_selections_only()
-> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.filter_pushdown_min_bytes_per_sec = 0.0;
// forcing filter selections minimizes the number of rows read from the cache
config
.options_mut()
Expand Down
Loading
Loading