Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
Expand Down Expand Up @@ -585,9 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_session_config_options(config)
}

async fn get_resolved_schema(
Expand Down Expand Up @@ -615,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
Expand Down Expand Up @@ -643,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}

Expand All @@ -669,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}

Expand Down
41 changes: 36 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -195,7 +196,8 @@ impl ListingTableConfig {

let listing_options = ListingOptions::new(file_format)
.with_file_extension(listing_file_extension)
.with_target_partitions(state.config().target_partitions());
.with_target_partitions(state.config().target_partitions())
.with_collect_stat(state.config().collect_statistics());

Ok(Self {
table_paths: self.table_paths,
Expand Down Expand Up @@ -313,18 +315,29 @@ impl ListingOptions {
/// - use default file extension filter
/// - no input partition to discover
/// - one target partition
/// - stat collection
/// - do not collect statistics
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
collect_stat: true,
collect_stat: false,
target_partitions: 1,
file_sort_order: vec![],
}
}

/// Set options from [`SessionConfig`] and returns self.
///
/// Currently this sets `target_partitions` and `collect_stat`
/// but if more options are added in the future that need to be coordinated
/// they will be synchronized thorugh this method.
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
self = self.with_target_partitions(config.target_partitions());
self = self.with_collect_stat(config.collect_statistics());
self
}

/// Set file extension on [`ListingOptions`] and returns self.
///
/// # Example
Expand Down Expand Up @@ -1282,7 +1295,9 @@ mod tests {

#[tokio::test]
async fn read_single_file() -> Result<()> {
let ctx = SessionContext::new();
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_collect_statistics(true),
);

let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
Expand All @@ -1309,7 +1324,7 @@ mod tests {

#[cfg(feature = "parquet")]
#[tokio::test]
async fn load_table_stats_by_default() -> Result<()> {
async fn do_not_load_table_stats_by_default() -> Result<()> {
use crate::datasource::file_format::parquet::ParquetFormat;

let testdata = crate::test_util::parquet_test_data();
Expand All @@ -1321,6 +1336,22 @@ mod tests {

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Absent
);

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(true);
let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory {
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_session_config_options(session_state.config())
.with_table_partition_cols(table_partition_cols);

options
Expand Down
45 changes: 45 additions & 0 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ mod tests {
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;

use arrow::util::pretty::pretty_format_batches;
use datafusion_common::assert_contains;
use datafusion_common::config::TableParquetOptions;
use datafusion_execution::config::SessionConfig;

Expand Down Expand Up @@ -129,6 +131,49 @@ mod tests {
Ok(())
}

async fn explain_query_all_with_config(config: SessionConfig) -> Result<String> {
let ctx = SessionContext::new_with_config(config);

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let df = ctx.sql("EXPLAIN SELECT * FROM test").await?;
let results = df.collect().await?;
let content = pretty_format_batches(&results).unwrap().to_string();
Ok(content)
}

#[tokio::test]
async fn register_parquet_respects_collect_statistics_config() -> Result<()> {
// The default is false
let mut config = SessionConfig::new();
config.options_mut().explain.physical_plan_only = true;
config.options_mut().explain.show_statistics = true;
let content = explain_query_all_with_config(config).await?;
assert_contains!(content, "statistics=[Rows=Absent,");

// Explicitly set to false
let mut config = SessionConfig::new();
config.options_mut().explain.physical_plan_only = true;
config.options_mut().explain.show_statistics = true;
config.options_mut().execution.collect_statistics = false;
let content = explain_query_all_with_config(config).await?;
assert_contains!(content, "statistics=[Rows=Absent,");

// Explicitly set to true
let mut config = SessionConfig::new();
config.options_mut().explain.physical_plan_only = true;
config.options_mut().explain.show_statistics = true;
config.options_mut().execution.collect_statistics = true;
let content = explain_query_all_with_config(config).await?;
assert_contains!(content, "statistics=[Rows=Exact(10),");

Ok(())
}

#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ async fn check_stats_precision_with_filter_pushdown() {
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let opt =
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
let table = get_listing_table(&table_path, None, &opt).await;

let (_, _, state) = get_cache_runtime_state();
Expand Down Expand Up @@ -109,7 +110,8 @@ async fn load_table_stats_with_session_level_cache() {
// Create a separate DefaultFileStatisticsCache
let (cache2, _, state2) = get_cache_runtime_state();

let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let opt =
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);

let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ async fn csv_explain_verbose_plans() {
async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str) {
// repro for https://github.com/apache/datafusion/issues/917
// where EXPLAIN ANALYZE was not correctly running optimizer
let ctx = SessionContext::new();
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_collect_statistics(true),
);
register_alltypes_parquet(&ctx).await;

// This happens as an optimization pass where count(*)/count(1) can be
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ async fn parquet_multiple_nonstring_partitions() -> Result<()> {

#[tokio::test]
async fn parquet_statistics() -> Result<()> {
let ctx = SessionContext::new();
let mut config = SessionConfig::new();
config.options_mut().execution.collect_statistics = true;
let ctx = SessionContext::new_with_config(config);

register_partitioned_alltypes_parquet(
&ctx,
Expand Down Expand Up @@ -583,7 +585,8 @@ async fn create_partitioned_alltypes_parquet_table(
.iter()
.map(|x| (x.0.to_owned(), x.1.clone()))
.collect::<Vec<_>>(),
);
)
.with_session_config_options(&ctx.copied_config());

let table_path = ListingTableUrl::parse(table_path).unwrap();
let store_path =
Expand Down
15 changes: 15 additions & 0 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@

## DataFusion `48.0.0`

### `ListingOptions` default for `collect_stat` changed from `true` to `false`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


This makes it agree with the default for `SessionConfig`.
Most users won't be impacted by this change but if you were using `ListingOptions` directly
and relied on the default value of `collect_stat` being `true`, you will need to
explicitly set it to `true` in your code.

```rust
# /* comment to avoid running
ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_collect_stat(true)
// other options
# */
```

### Processing `Field` instead of `DataType` for user defined functions

In order to support metadata handling and extension types, user defined functions are
Expand Down