Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune scanned files on column stats #724

Merged
merged 11 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ futures = "0.3"
bytes = "1"
log = "0"
regex = "1"
chrono = "0"
chrono = "0.4.20"
uuid = { version = "1", features = ["serde", "v4"] }
lazy_static = "1"
percent-encoding = "2"
Expand Down
7 changes: 2 additions & 5 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::reader::{Decoder, DecoderOptions};
use chrono::Datelike;
use chrono::Duration;
use chrono::Utc;
use chrono::MIN_DATETIME;
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
Expand Down Expand Up @@ -229,7 +226,7 @@ async fn cleanup_expired_logs_for(
0,
ObjectMeta {
location: Path::from(""),
last_modified: MIN_DATETIME,
last_modified: DateTime::<Utc>::MIN_UTC,
size: 0,
},
);
Expand Down
241 changes: 187 additions & 54 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,55 @@
//! ```

use std::any::Any;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
use async_trait::async_trait;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::{combine_filters, Column, Expr};
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use datafusion::scalar::ScalarValue;
use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::action;
use crate::delta;
use crate::schema;
use crate::DeltaTableError;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
DeltaTableError::ArrowError { source } => DataFusionError::ArrowError(source),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::ParquetError { source } => DataFusionError::ParquetError(source),
_ => DataFusionError::External(Box::new(err)),
}
}
}

impl From<DataFusionError> for crate::DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::ArrowError { source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::ParquetError { source },
_ => DeltaTableError::Generic(err.to_string()),
}
}
}

impl delta::DeltaTable {
/// Return statistics for Datafusion Table
Expand Down Expand Up @@ -193,39 +221,86 @@ impl delta::DeltaTable {
}
}

// TODO: uncomment this when datafusion supports per partitioned file stats
// fn add_action_df_stats(add: &action::Add, schema: &schema::Schema) -> Statistics {
// if let Ok(Some(statistics)) = add.get_stats() {
// Statistics {
// num_rows: Some(statistics.num_records as usize),
// total_byte_size: Some(add.size as usize),
// column_statistics: Some(
// schema
// .get_fields()
// .iter()
// .map(|field| ColumnStatistics {
// null_count: statistics
// .null_count
// .get(field.get_name())
// .and_then(|f| f.as_value().map(|v| v as usize)),
// max_value: statistics
// .max_values
// .get(field.get_name())
// .and_then(|f| to_scalar_value(f.as_value()?)),
// min_value: statistics
// .min_values
// .get(field.get_name())
// .and_then(|f| to_scalar_value(f.as_value()?)),
// distinct_count: None, // TODO: distinct
// })
// .collect(),
// ),
// is_exact: true,
// }
// } else {
// Statistics::default()
// }
// }
impl PruningStatistics for delta::DeltaTable {
Copy link
Collaborator Author

@roeap roeap Aug 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing this made me think of how to best store stats, which is an ongoing topic (#454)... Maybe PruningStatisticss view on the work helps?

Somewhere along the lines of

pub struct Stats {
    files: HashMap<Path, (usize, PartitionedFile)>,
    max_values: RecordBatch,
    min_values: RecordBatch,
    ...
}

... and then implement some convenience accessors to get data per column / per file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, switching to columnar format will help in many places :)

/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.min_values
.get(&column.name)
.and_then(|f| {
correct_scalar_value_type(
to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null),
&data_type,
)
})
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let field = self
.get_schema()
.ok()
.map(|s| s.get_field_with_name(&column.name).ok())??;
let data_type = field.get_type().try_into().ok()?;
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.max_values
.get(&column.name)
.and_then(|f| {
correct_scalar_value_type(
to_scalar_value(f.as_value()?).unwrap_or(ScalarValue::Null),
&data_type,
)
})
.unwrap_or(ScalarValue::Null)
} else {
ScalarValue::Null
}
});
ScalarValue::iter_to_array(values).ok()
}

/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize {
self.get_state().files().len()
}

/// return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
} else {
ScalarValue::UInt64(None)
}
});
ScalarValue::iter_to_array(values).ok()
}
}

#[async_trait]
impl TableProvider for delta::DeltaTable {
Expand Down Expand Up @@ -263,29 +338,55 @@ impl TableProvider for delta::DeltaTable {
self.object_store(),
);

// TODO prune files based on file statistics and filter expressions
let partitions = self
.get_state()
.files()
.iter()
.map(|action| {
Ok(vec![PartitionedFile::new(
action.path.clone(),
action.size as u64,
)])
})
.collect::<DataFusionResult<_>>()?;
// TODO we group files together by their partition values. If the table is partitioned
// and partitions are somewhat evenly distributed, probably not the worst choice ...
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
if let Some(Some(predicate)) = (!filters.is_empty()).then_some(combine_filters(filters)) {
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.for_each(|(action, prune_file)| {
if !prune_file {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
};
});
} else {
self.get_state().files().iter().for_each(|action| {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
});
};

let table_partition_cols = self.get_metadata()?.partition_columns.clone();
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect(),
));
ParquetFormat::default()
.create_physical_plan(
FileScanConfig {
object_store_url,
file_schema: schema,
file_groups: partitions,
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.datafusion_table_statistics(),
projection: projection.clone(),
limit,
table_partition_cols: self.get_metadata().unwrap().partition_columns.clone(),
table_partition_cols,
},
filters,
)
Expand All @@ -297,6 +398,38 @@ impl TableProvider for delta::DeltaTable {
}
}

fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile {
let partition_values = schema
.fields()
.iter()
.filter_map(|f| {
action.partition_values.get(f.name()).map(|val| match val {
Some(value) => {
match to_scalar_value(&serde_json::Value::String(value.to_string())) {
Some(parsed) => correct_scalar_value_type(parsed, f.data_type())
.unwrap_or(ScalarValue::Null),
None => ScalarValue::Null,
}
}
None => ScalarValue::Null,
})
})
.collect::<Vec<_>>();
let ts_secs = action.modification_time / 1000;
let ts_ns = (action.modification_time % 1000) * 1_000_000;
let last_modified =
DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(ts_secs, ts_ns as u32), Utc);
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(action.path.clone()),
last_modified,
size: action.size as usize,
},
partition_values,
range: None,
}
}

fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::ScalarValue> {
match stat_val {
serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)),
Expand Down
12 changes: 12 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ fn get_table_from_uri_without_update(table_uri: String) -> DeltaCommandResult<De
Ok(table)
}

impl From<DeltaTable> for DeltaCommands {
fn from(table: DeltaTable) -> Self {
Self { table }
}
}

impl From<DeltaCommands> for DeltaTable {
fn from(comm: DeltaCommands) -> Self {
comm.table
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 19 additions & 0 deletions rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ impl SchemaTypeStruct {
pub fn get_fields(&self) -> &Vec<SchemaField> {
&self.fields
}

/// Returns an immutable reference of a specific `Field` instance selected by name.
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> {
Ok(&self.fields[self.index_of(name)?])
}

/// Find the index of the column with the given name.
pub fn index_of(&self, name: &str) -> Result<usize, crate::DeltaTableError> {
for i in 0..self.fields.len() {
if self.fields[i].get_name() == name {
return Ok(i);
}
}
let valid_fields: Vec<String> = self.fields.iter().map(|f| f.name.clone()).collect();
Err(crate::DeltaTableError::Generic(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
name, valid_fields
)))
}
}

/// Describes a specific field of the Delta table schema.
Expand Down
Loading