Skip to content

Commit

Permalink
Refactor Statistics, introduce precision estimates (Exact, `Inexact…
Browse files Browse the repository at this point in the history
…`, `Absent`) (#7793)

* analysis context refactored

* is_exact fix

* Minor changes

* minor changes

* Minor changes

* Minor changes

* datatype check added, statistics default removed

* MemExec uses the stats of projections, agg optimize excludes unbounded cases

* fix after merge

* proto fix

* Simplifications

* statistics() returns result

* fix after merge

* Simplifications

* Remove option from column stats

* exact info added

* error in agg optimization

* bugs are fixed

* negative expr support

* fix after merge

* fix after merge

* Minor changes, simplifications

* minor changes

* min max accs removed

* fix after merge

* minor changes

* fix initialization of stats in limit

* minor changes

* Simplifications

* more accurate row calculations

* Improve comments

* min-max values are init as absent, not inf

* fix after merge

* Review Part 1

* Cardinality calculation is fixed

* Review Part 2

* get_int_range replaced by cardinality function

* Fix imports

* Statistics display is shortened.

* fix after merge

* Harmonize imports

* Update datafusion/physical-expr/src/intervals/interval_aritmetic.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Addresses the reviews

* Update tests

* Remove panics

* 1 bug-fix, 2 code simplifications

1) get_statistics_with_limit() is simplified
2) AnalysisContext { boundaries } does not need to be an Option, it is removed.
3) Aggregation statistics() has a special handling for 0 and 1 row cases.

* conflict resolved

* conflict resolved

* Update datafusion/physical-plan/src/filter.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Simplify set_max/min helpers

* fix vector copy, remove clones

* resolving conflict

* remove clone

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
4 people authored Oct 17, 2023
1 parent 2cd1706 commit c9330bc
Show file tree
Hide file tree
Showing 90 changed files with 2,791 additions and 2,216 deletions.
4 changes: 3 additions & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::{sync::Arc, vec};

use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
Expand All @@ -29,6 +30,7 @@ use datafusion::{
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use futures::StreamExt;
use object_store::local::LocalFileSystem;

Expand Down Expand Up @@ -60,7 +62,7 @@ async fn main() -> Result<()> {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
statistics: Default::default(),
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
Expand Down
21 changes: 11 additions & 10 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::provider_as_source;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
Expand All @@ -32,11 +36,8 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use async_trait::async_trait;
use tokio::time::timeout;

/// This example demonstrates executing a simple query against a custom datasource
Expand Down Expand Up @@ -270,7 +271,7 @@ impl ExecutionPlan for CustomExec {
)?))
}

fn statistics(&self) -> Statistics {
Statistics::default()
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
4 changes: 3 additions & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_common::Statistics;

use futures::StreamExt;
use object_store::ObjectStore;

Expand Down Expand Up @@ -63,7 +65,7 @@ async fn main() -> Result<()> {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
statistics: Default::default(),
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
Expand Down
100 changes: 63 additions & 37 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

//! This module provides data structures to represent statistics
use std::fmt::{self, Debug, Display};

use crate::ScalarValue;
use arrow::datatypes::DataType;

use std::fmt::{self, Debug, Display};
use arrow_schema::Schema;

/// Represents a value with a degree of certainty. `Precision` is used to
/// propagate information the precision of statistical values.
Expand Down Expand Up @@ -203,70 +204,95 @@ impl<T: fmt::Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
pub num_rows: Option<usize>,
/// total bytes of the table rows
pub total_byte_size: Option<usize>,
/// Statistics on a column level
pub column_statistics: Option<Vec<ColumnStatistics>>,
/// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not
/// an estimate). Any or all other fields might still be None, in which case no information is known.
/// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value.
pub is_exact: bool,
/// The number of table rows.
pub num_rows: Precision<usize>,
/// Total bytes of the table rows.
pub total_byte_size: Precision<usize>,
/// Statistics on a column level. It contains a [`ColumnStatistics`] for
/// each field in the schema of the the table to which the [`Statistics`] refer.
pub column_statistics: Vec<ColumnStatistics>,
}

impl Display for Statistics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.num_rows.is_none() && self.total_byte_size.is_none() && !self.is_exact {
return Ok(());
impl Statistics {
/// Returns a [`Statistics`] instance for the given schema by assigning
/// unknown statistics to each column in the schema.
pub fn new_unknown(schema: &Schema) -> Self {
Self {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(schema),
}
}

let rows = self
.num_rows
.map_or_else(|| "None".to_string(), |v| v.to_string());
let bytes = self
.total_byte_size
.map_or_else(|| "None".to_string(), |v| v.to_string());
/// Returns an unbounded `ColumnStatistics` for each field in the schema.
pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
schema
.fields()
.iter()
.map(|_| ColumnStatistics::new_unknown())
.collect()
}

write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?;
/// If the exactness of a [`Statistics`] instance is lost, this function relaxes
/// the exactness of all information by converting them [`Precision::Inexact`].
pub fn into_inexact(self) -> Self {
Statistics {
num_rows: self.num_rows.to_inexact(),
total_byte_size: self.total_byte_size.to_inexact(),
column_statistics: self
.column_statistics
.into_iter()
.map(|cs| ColumnStatistics {
null_count: cs.null_count.to_inexact(),
max_value: cs.max_value.to_inexact(),
min_value: cs.min_value.to_inexact(),
distinct_count: cs.distinct_count.to_inexact(),
})
.collect::<Vec<_>>(),
}
}
}

impl Display for Statistics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Rows={}, Bytes={}", self.num_rows, self.total_byte_size)?;

Ok(())
}
}

/// Statistics for a column within a relation
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ColumnStatistics {
/// Number of null values on column
pub null_count: Option<usize>,
pub null_count: Precision<usize>,
/// Maximum value of column
pub max_value: Option<ScalarValue>,
pub max_value: Precision<ScalarValue>,
/// Minimum value of column
pub min_value: Option<ScalarValue>,
pub min_value: Precision<ScalarValue>,
/// Number of distinct values
pub distinct_count: Option<usize>,
pub distinct_count: Precision<usize>,
}

impl ColumnStatistics {
/// Column contains a single non null value (e.g constant).
pub fn is_singleton(&self) -> bool {
match (&self.min_value, &self.max_value) {
match (self.min_value.get_value(), self.max_value.get_value()) {
// Min and max values are the same and not infinity.
(Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min == max),
(_, _) => false,
}
}

/// Returns the [`ColumnStatistics`] corresponding to the given datatype by assigning infinite bounds.
pub fn new_with_unbounded_column(dt: &DataType) -> ColumnStatistics {
let null = ScalarValue::try_from(dt.clone()).ok();
/// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
pub fn new_unknown() -> ColumnStatistics {
ColumnStatistics {
null_count: None,
max_value: null.clone(),
min_value: null,
distinct_count: None,
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
distinct_count: Precision::Absent,
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
//!
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
use std::any::Any;
use std::io::{Read, Seek};
use std::sync::Arc;

use crate::datasource::file_format::FileFormat;
use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;

use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;

use datafusion_common::{FileType, Statistics};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use std::any::Any;
use std::io::{Read, Seek};
use std::sync::Arc;

/// Arrow `FileFormat` implementation.
#[derive(Default, Debug)]
Expand Down Expand Up @@ -74,10 +78,10 @@ impl FileFormat for ArrowFormat {
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
Ok(Statistics::new_unknown(&table_schema))
}

async fn create_physical_plan(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ impl FileFormat for AvroFormat {
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
Ok(Statistics::new_unknown(&table_schema))
}

async fn create_physical_plan(
Expand Down
39 changes: 20 additions & 19 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{
Expand All @@ -51,6 +36,21 @@ use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use rand::distributions::{Alphanumeric, DistString};

/// Character Separated Value `FileFormat` implementation.
Expand Down Expand Up @@ -236,10 +236,10 @@ impl FileFormat for CsvFormat {
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
Ok(Statistics::new_unknown(&table_schema))
}

async fn create_physical_plan(
Expand Down Expand Up @@ -625,6 +625,7 @@ mod tests {
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_common::FileType;
use datafusion_common::GetExt;
use datafusion_expr::{col, lit};
Expand Down Expand Up @@ -657,8 +658,8 @@ mod tests {
assert_eq!(tt_batches, 50 /* 100/2 */);

// test metadata
assert_eq!(exec.statistics().num_rows, None);
assert_eq!(exec.statistics().total_byte_size, None);
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);

Ok(())
}
Expand Down
Loading

0 comments on commit c9330bc

Please sign in to comment.