From 843bce34cc6b3d7fc3bdd2eb342dc1d1be232ff5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 17 May 2025 22:10:16 -0400 Subject: [PATCH 1/9] fix --- datafusion/core/src/datasource/file_format/options.rs | 1 + datafusion/core/src/datasource/listing/table.rs | 7 +++++-- datafusion/core/tests/sql/explain_analyze.rs | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 08e9a628dd61..fca3cf58da20 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -588,6 +588,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) + .with_collect_stat(config.collect_statistics()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 26daa88c9ec1..a744931e6906 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -195,7 +195,8 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(file_format) .with_file_extension(listing_file_extension) - .with_target_partitions(state.config().target_partitions()); + .with_target_partitions(state.config().target_partitions()) + .with_collect_stat(state.config().collect_statistics()); Ok(Self { table_paths: self.table_paths, @@ -1282,7 +1283,9 @@ mod tests { #[tokio::test] async fn read_single_file() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_collect_statistics(true), + ); let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index e8ef34c2afe7..3c7d85ef87d3 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -561,7 +561,9 @@ async fn csv_explain_verbose_plans() { async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str) { // repro for https://github.com/apache/datafusion/issues/917 // where EXPLAIN ANALYZE was not correctly running optimizer - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_collect_statistics(true), + ); register_alltypes_parquet(&ctx).await; // This happens as an optimization pass where count(*)/count(1) can be From d0ad65053bb3cafaccee02b5739b52a4e6f1b483 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:16:53 -0700 Subject: [PATCH 2/9] add a test --- .../src/datasource/file_format/options.rs | 11 ++-- .../core/src/datasource/listing/table.rs | 14 ++++- .../src/datasource/listing_table_factory.rs | 3 +- .../core/src/execution/context/parquet.rs | 54 +++++++++++++++++++ 4 files changed, 73 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index fca3cf58da20..9aaf1cf59811 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) } @@ -585,10 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) - .with_collect_stat(config.collect_statistics()) + .with_session_config_options(config) } async fn get_resolved_schema( @@ -616,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) } @@ -644,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) } @@ -670,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a744931e6906..48336d591c08 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; +use datafusion_execution::config::SessionConfig; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -320,12 +321,23 @@ impl ListingOptions { file_extension: format.get_ext(), format, table_partition_cols: vec![], - collect_stat: true, + collect_stat: false, target_partitions: 1, file_sort_order: vec![], } } + /// Set options from [`SessionConfig`] and returns self. + /// + /// Currently this sets `target_partitions` and `collect_stat` + /// but if more options are added in the future that need to be coordinated + /// they will be synchronized thorugh this method. + pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self { + self = self.with_target_partitions(config.target_partitions()); + self = self.with_collect_stat(config.collect_statistics()); + self + } + /// Set file extension on [`ListingOptions`] and returns self. /// /// # Example diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e9..71686c61a8f7 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory { let table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) - .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) - .with_target_partitions(state.config().target_partitions()) + .with_session_config_options(session_state.config()) .with_table_partition_cols(table_partition_cols); options diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index a7c4887b24ce..8c3bebc24f97 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -84,6 +84,8 @@ mod tests { use crate::parquet::basic::Compression; use crate::test_util::parquet_test_data; + use arrow::util::pretty::pretty_format_batches; + use datafusion_common::assert_contains; use datafusion_common::config::TableParquetOptions; use datafusion_execution::config::SessionConfig; @@ -129,6 +131,58 @@ mod tests { Ok(()) } + async fn explain_query_all_with_config(config: SessionConfig) -> Result { + let ctx = SessionContext::new_with_config(config); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx.sql("EXPLAIN SELECT * FROM test").await?; + let results = df.collect().await?; + let content = pretty_format_batches(&results).unwrap().to_string(); + Ok(content) + } + + #[tokio::test] + async fn register_parquet_respects_collect_statistics_config() -> Result<()> { + // The default is false + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + let content = explain_query_all_with_config(config).await?; + assert_contains!( + content, + "statistics=[Rows=Absent," + ); + + // Explicitly set to false + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + config.options_mut().execution.collect_statistics = false; + let content = explain_query_all_with_config(config).await?; + assert_contains!( + content, + "statistics=[Rows=Absent," + ); + + // Explicitly set to true + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + config.options_mut().execution.collect_statistics = true; + let content = explain_query_all_with_config(config).await?; + assert_contains!( + content, + "statistics=[Rows=Exact(10)," + ); + + Ok(()) + } + #[tokio::test] async fn read_from_registered_table_with_glob_path() -> Result<()> { let ctx = SessionContext::new(); From 710e1e57e9f2bc5317c02ccc2e22c474daaccf00 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:17:00 -0700 Subject: [PATCH 3/9] fmt --- datafusion/core/src/execution/context/parquet.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 8c3bebc24f97..23ac6b884871 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -153,10 +153,7 @@ mod tests { config.options_mut().explain.physical_plan_only = true; config.options_mut().explain.show_statistics = true; let content = explain_query_all_with_config(config).await?; - assert_contains!( - content, - "statistics=[Rows=Absent," - ); + assert_contains!(content, "statistics=[Rows=Absent,"); // Explicitly set to false let mut config = SessionConfig::new(); @@ -164,10 +161,7 @@ mod tests { config.options_mut().explain.show_statistics = true; config.options_mut().execution.collect_statistics = false; let content = explain_query_all_with_config(config).await?; - assert_contains!( - content, - "statistics=[Rows=Absent," - ); + assert_contains!(content, "statistics=[Rows=Absent,"); // Explicitly set to true let mut config = SessionConfig::new(); @@ -175,10 +169,7 @@ mod tests { config.options_mut().explain.show_statistics = true; config.options_mut().execution.collect_statistics = true; let content = explain_query_all_with_config(config).await?; - assert_contains!( - content, - "statistics=[Rows=Exact(10)," - ); + assert_contains!(content, "statistics=[Rows=Exact(10),"); Ok(()) } From 98588de9d5a86b2fb8e7251e51d7a01aaf1133f7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:24:45 -0700 Subject: [PATCH 4/9] add to upgrade guide --- datafusion/core/src/datasource/listing/table.rs | 2 +- docs/source/library-user-guide/upgrading.md | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 48336d591c08..028882aef6e1 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -315,7 +315,7 @@ impl ListingOptions { /// - use default file extension filter /// - no input partition to discover /// - one target partition - /// - stat collection + /// - do not collect statistics pub fn new(format: Arc) -> Self { Self { file_extension: format.get_ext(), diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 620be657b44b..cfdb6248bd59 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -21,6 +21,19 @@ ## DataFusion `48.0.0` +### `ListingOptions` default for `collect_stat` changed from `true` to `false` + +This makes it agree with the default for `SessionConfig`. +Most users won't be impacted by this change but if you were using `ListingOptions` directly +and relied on the default value of `collect_stat` being `true`, you will need to +explicitly set it to `true` in your code. + +```rust +ListingOptions::new() + .with_collect_stat(true) + // other options +``` + ### Processing `Field` instead of `DataType` for user defined functions In order to support metadata handling and extension types, user defined functions are From 8b7b047b62115c112db06bfc2378577ca4e9f1e3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:36:40 -0700 Subject: [PATCH 5/9] fix tests --- .../core/src/datasource/listing/table.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 028882aef6e1..3c87d3ee2329 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1324,7 +1324,7 @@ mod tests { #[cfg(feature = "parquet")] #[tokio::test] - async fn load_table_stats_by_default() -> Result<()> { + async fn do_not_load_table_stats_by_default() -> Result<()> { use crate::datasource::file_format::parquet::ParquetFormat; let testdata = crate::test_util::parquet_test_data(); @@ -1336,6 +1336,22 @@ mod tests { let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = opt.infer_schema(&state, &table_path).await?; + let config = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let exec = table.scan(&state, None, &[], None).await?; + assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); + + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_collect_stat(true); + let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); From d9a7eee8d68dba79a07faab92010f06713807881 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:55:19 -0700 Subject: [PATCH 6/9] fix test --- datafusion/core/tests/sql/path_partition.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 131a396ccb9a..da450caad3cf 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -431,7 +431,9 @@ async fn parquet_multiple_nonstring_partitions() -> Result<()> { #[tokio::test] async fn parquet_statistics() -> Result<()> { - let ctx = SessionContext::new(); + let mut config = SessionConfig::new(); + config.options_mut().execution.collect_statistics = true; + let ctx = SessionContext::new_with_config(config); register_partitioned_alltypes_parquet( &ctx, From 4beea49404340a0ed811d3b98b7eb8cb7db31385 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 08:56:53 -0700 Subject: [PATCH 7/9] fix test --- datafusion/core/tests/sql/path_partition.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index da450caad3cf..5e9748d23d8c 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -585,7 +585,8 @@ async fn create_partitioned_alltypes_parquet_table( .iter() .map(|x| (x.0.to_owned(), x.1.clone())) .collect::>(), - ); + ) + .with_session_config_options(&ctx.copied_config()); let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path = From 5e12a73d5c69c4749b92e6c3836d7b09fc00f089 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 19 May 2025 13:29:57 -0700 Subject: [PATCH 8/9] fix ci --- datafusion/core/tests/parquet/file_statistics.rs | 6 ++++-- docs/source/library-user-guide/upgrading.md | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index a038d414cb57..a60beaf665e5 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -50,7 +50,8 @@ async fn check_stats_precision_with_filter_pushdown() { let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); let table_path = ListingTableUrl::parse(filename).unwrap(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); let table = get_listing_table(&table_path, None, &opt).await; let (_, _, state) = get_cache_runtime_state(); @@ -109,7 +110,8 @@ async fn load_table_stats_with_session_level_cache() { // Create a separate DefaultFileStatisticsCache let (cache2, _, state2) = get_cache_runtime_state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); let table1 = get_listing_table(&table_path, Some(cache1), &opt).await; let table2 = get_listing_table(&table_path, Some(cache2), &opt).await; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index cfdb6248bd59..cb661f7d1537 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -29,6 +29,8 @@ and relied on the default value of `collect_stat` being `true`, you will need to explicitly set it to `true` in your code. ```rust +use datafusion::datasource::listing::ListingOptions; + ListingOptions::new() .with_collect_stat(true) // other options From c06c41b4cfb0164107ea71306212b8f35cb46a40 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 21 May 2025 09:17:15 -0400 Subject: [PATCH 9/9] Fix example in upgrade guide (#29) --- docs/source/library-user-guide/upgrading.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index cb661f7d1537..a6ff73e13dc9 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -29,11 +29,11 @@ and relied on the default value of `collect_stat` being `true`, you will need to explicitly set it to `true` in your code. ```rust -use datafusion::datasource::listing::ListingOptions; - -ListingOptions::new() +# /* comment to avoid running +ListingOptions::new(Arc::new(ParquetFormat::default())) .with_collect_stat(true) // other options +# */ ``` ### Processing `Field` instead of `DataType` for user defined functions