Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune scanned files on column stats #724

Merged
merged 11 commits into from
Aug 8, 2022
Merged

Prune scanned files on column stats #724

merged 11 commits into from
Aug 8, 2022

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Aug 6, 2022

Description

This PR deepens the integration with datafusion by leveraging the column statistics from the delta log to prune the files that need to be scanned when additional constraints are supplied. Luckily datafusion provides some excellent utilities to implement this. Specifically we implement PruningStatistics for DeltaTable, use that with PruningPredicate, and the rest just kind of happens 😆.

I still need to do cleanup and more testing as well as have a look how much effort and dependency growth it would be to adopt datafusion expressions for our partition / stats handling. However if there is feedback as to if this is the right way to go, I'd be happy to hear it.

cc @houqp @wjones127 - maybe even @tustvold has some feedback? :)

Related Issue(s)

Especially the statistics recorded during scans should get us a lot closer to finishing #632

Documentation

@roeap roeap marked this pull request as draft August 6, 2022 18:10
@roeap
Copy link
Collaborator Author

roeap commented Aug 7, 2022

@wjones127 - the python 3.7 builds seem to have started failing here and in other PRs. It seems it tries to build pyarrow from source again and fails to find Arrow C++. While we could install it, my understand is this should not be the case -also everything worked until very recently, any ideas?

@@ -310,6 +448,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::S
}
}
serde_json::Value::String(s) => Some(ScalarValue::from(s.as_str())),
// TODO is it permissible to encode arrays / objects as partition values?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@houqp
Copy link
Member

houqp commented Aug 7, 2022

For the py37 build error, it's because pyarrow 9 stopped releasing manylinux2010 wheels. Compare https://pypi.org/project/pyarrow/9.0.0/#files with https://pypi.org/project/pyarrow/8.0.0/#files. We might need to bump our manylinux support to 2014 too :(

@wjones127 is the manylinux2010 support removal in arrow 9 release expected?

Comment on lines 365 to 369
.zip(files_to_prune.into_iter())
.filter_map(|(action, prune_file)| {
if prune_file {
return None;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth specializing this so we are not paying the penalty of iterating and checking prune file array when there is no file to prune. For example, the code below can be abstracted into a function, then we have two types of iterator loops that calls it. One iterator loop zips with files_to_prune, the other one simply just iterates through get_state().files().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Updated it and to the opportunity to batch files by partition values. Thats not going to be optimal for many cases, but by default hopefully better than each file in its own (datafusion-)partition.

@roeap roeap marked this pull request as ready for review August 7, 2022 12:15
Comment on lines +51 to +64
impl ExecutionPlanVisitor for ExecutionMetricsCollector {
type Error = DataFusionError;

fn pre_visit(
&mut self,
plan: &dyn ExecutionPlan,
) -> std::result::Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
let files = get_scanned_files(exec);
self.scanned_files.extend(files);
}
Ok(true)
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just did this for testing right now, but I would like to use something like this to collect the statistics we need for proper conflict resolution.

// Statistics::default()
// }
// }
impl PruningStatistics for delta::DeltaTable {
Copy link
Collaborator Author

@roeap roeap Aug 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing this made me think of how to best store stats, which is an ongoing topic (#454)... Maybe PruningStatisticss view on the work helps?

Somewhere along the lines of

pub struct Stats {
    files: HashMap<Path, (usize, PartitionedFile)>,
    max_values: RecordBatch,
    min_values: RecordBatch,
    ...
}

... and then implement some convenience accessors to get data per column / per file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, switching to columnar format will help in many places :)

Comment on lines 348 to 349
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit concerned about interpreting the filters properly, but luckily the hard part was done 😆.

@roeap roeap requested a review from houqp August 7, 2022 12:36
"Failed to evaluate table pruning predicates.".to_string(),
)
})??
.for_each(|f| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iterates through the vector again no? I think we should be able to perform the hashmap insertion within filter_map callback above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely! we are avoid that extra iteration now.

@houqp
Copy link
Member

houqp commented Aug 8, 2022

Thanks @roeap , this is a great demonstration of datafusion's extensibility :) The rest looks good to me, left a very minor comment.

@wjones127
Copy link
Collaborator

@wjones127 is the manylinux2010 support removal in arrow 9 release expected?

Yes. Sorry I saw that in the release notes and didn't connect the dots.

Odd though, I'm unsure why it's being installed in the job that's supposed to only install PyArrow 4.X For some reason maturin develop is doing some pip --force-reinstall and that's causing an upgrade. I need to look more into this.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@roeap roeap merged commit f9816b0 into delta-io:main Aug 8, 2022
@roeap roeap deleted the commands branch August 8, 2022 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants