diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 08e9a628dd61..9aaf1cf59811 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) } @@ -585,9 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) + .with_session_config_options(config) } async fn get_resolved_schema( @@ -615,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) } @@ -643,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) } @@ -669,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions()) + .with_session_config_options(config) .with_table_partition_cols(self.table_partition_cols.clone()) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 26daa88c9ec1..3c87d3ee2329 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; +use datafusion_execution::config::SessionConfig; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -195,7 +196,8 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(file_format) .with_file_extension(listing_file_extension) - .with_target_partitions(state.config().target_partitions()); + .with_target_partitions(state.config().target_partitions()) + .with_collect_stat(state.config().collect_statistics()); Ok(Self { table_paths: self.table_paths, @@ -313,18 +315,29 @@ impl ListingOptions { /// - use default file extension filter /// - no input partition to discover /// - one target partition - /// - stat collection + /// - do not collect statistics pub fn new(format: Arc) -> Self { Self { file_extension: format.get_ext(), format, table_partition_cols: vec![], - collect_stat: true, + collect_stat: false, target_partitions: 1, file_sort_order: vec![], } } + /// Set options from [`SessionConfig`] and returns self. + /// + /// Currently this sets `target_partitions` and `collect_stat` + /// but if more options are added in the future that need to be coordinated + /// they will be synchronized thorugh this method. + pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self { + self = self.with_target_partitions(config.target_partitions()); + self = self.with_collect_stat(config.collect_statistics()); + self + } + /// Set file extension on [`ListingOptions`] and returns self. /// /// # Example @@ -1282,7 +1295,9 @@ mod tests { #[tokio::test] async fn read_single_file() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_collect_statistics(true), + ); let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; @@ -1309,7 +1324,7 @@ mod tests { #[cfg(feature = "parquet")] #[tokio::test] - async fn load_table_stats_by_default() -> Result<()> { + async fn do_not_load_table_stats_by_default() -> Result<()> { use crate::datasource::file_format::parquet::ParquetFormat; let testdata = crate::test_util::parquet_test_data(); @@ -1321,6 +1336,22 @@ mod tests { let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = opt.infer_schema(&state, &table_path).await?; + let config = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt) + .with_schema(schema); + let table = ListingTable::try_new(config)?; + + let exec = table.scan(&state, None, &[], None).await?; + assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); + + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_collect_stat(true); + let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e9..71686c61a8f7 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory { let table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) - .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) - .with_target_partitions(state.config().target_partitions()) + .with_session_config_options(session_state.config()) .with_table_partition_cols(table_partition_cols); options diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index a7c4887b24ce..23ac6b884871 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -84,6 +84,8 @@ mod tests { use crate::parquet::basic::Compression; use crate::test_util::parquet_test_data; + use arrow::util::pretty::pretty_format_batches; + use datafusion_common::assert_contains; use datafusion_common::config::TableParquetOptions; use datafusion_execution::config::SessionConfig; @@ -129,6 +131,49 @@ mod tests { Ok(()) } + async fn explain_query_all_with_config(config: SessionConfig) -> Result { + let ctx = SessionContext::new_with_config(config); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx.sql("EXPLAIN SELECT * FROM test").await?; + let results = df.collect().await?; + let content = pretty_format_batches(&results).unwrap().to_string(); + Ok(content) + } + + #[tokio::test] + async fn register_parquet_respects_collect_statistics_config() -> Result<()> { + // The default is false + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + let content = explain_query_all_with_config(config).await?; + assert_contains!(content, "statistics=[Rows=Absent,"); + + // Explicitly set to false + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + config.options_mut().execution.collect_statistics = false; + let content = explain_query_all_with_config(config).await?; + assert_contains!(content, "statistics=[Rows=Absent,"); + + // Explicitly set to true + let mut config = SessionConfig::new(); + config.options_mut().explain.physical_plan_only = true; + config.options_mut().explain.show_statistics = true; + config.options_mut().execution.collect_statistics = true; + let content = explain_query_all_with_config(config).await?; + assert_contains!(content, "statistics=[Rows=Exact(10),"); + + Ok(()) + } + #[tokio::test] async fn read_from_registered_table_with_glob_path() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index a038d414cb57..a60beaf665e5 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -50,7 +50,8 @@ async fn check_stats_precision_with_filter_pushdown() { let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); let table_path = ListingTableUrl::parse(filename).unwrap(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); let table = get_listing_table(&table_path, None, &opt).await; let (_, _, state) = get_cache_runtime_state(); @@ -109,7 +110,8 @@ async fn load_table_stats_with_session_level_cache() { // Create a separate DefaultFileStatisticsCache let (cache2, _, state2) = get_cache_runtime_state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); let table1 = get_listing_table(&table_path, Some(cache1), &opt).await; let table2 = get_listing_table(&table_path, Some(cache2), &opt).await; diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index e8ef34c2afe7..3c7d85ef87d3 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -561,7 +561,9 @@ async fn csv_explain_verbose_plans() { async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str) { // repro for https://github.com/apache/datafusion/issues/917 // where EXPLAIN ANALYZE was not correctly running optimizer - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_collect_statistics(true), + ); register_alltypes_parquet(&ctx).await; // This happens as an optimization pass where count(*)/count(1) can be diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 131a396ccb9a..5e9748d23d8c 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -431,7 +431,9 @@ async fn parquet_multiple_nonstring_partitions() -> Result<()> { #[tokio::test] async fn parquet_statistics() -> Result<()> { - let ctx = SessionContext::new(); + let mut config = SessionConfig::new(); + config.options_mut().execution.collect_statistics = true; + let ctx = SessionContext::new_with_config(config); register_partitioned_alltypes_parquet( &ctx, @@ -583,7 +585,8 @@ async fn create_partitioned_alltypes_parquet_table( .iter() .map(|x| (x.0.to_owned(), x.1.clone())) .collect::>(), - ); + ) + .with_session_config_options(&ctx.copied_config()); let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path = diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 620be657b44b..a6ff73e13dc9 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -21,6 +21,21 @@ ## DataFusion `48.0.0` +### `ListingOptions` default for `collect_stat` changed from `true` to `false` + +This makes it agree with the default for `SessionConfig`. +Most users won't be impacted by this change but if you were using `ListingOptions` directly +and relied on the default value of `collect_stat` being `true`, you will need to +explicitly set it to `true` in your code. + +```rust +# /* comment to avoid running +ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_collect_stat(true) + // other options +# */ +``` + ### Processing `Field` instead of `DataType` for user defined functions In order to support metadata handling and extension types, user defined functions are