Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epic] A Collection of Sort Based Optimizations #10313

Open
5 of 12 tasks
Tracked by #10283
alamb opened this issue Apr 30, 2024 · 6 comments
Open
5 of 12 tasks
Tracked by #10283

[Epic] A Collection of Sort Based Optimizations #10313

alamb opened this issue Apr 30, 2024 · 6 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Apr 30, 2024

Usecase

Many analytic systems store their data with some particular sort order, and the query engine can often take advantage of this sort order to both reduce memory usage and performance

Specific examples in Datafusion include:

  1. Emitting from GroupBy early with partially sorted stream
  2. SortMergeJoin
  3. Sort removal via EnforceSorting and replace_with_order_preserving_variants

This information is currently encoded in ExecutionPlan::maintains_input_order ExecutionPlan::required_input_ordering and PlanProperties

The same underlying analysis is often required for streaming (where determining what to emit is modeled as a sorted stream, for example on date_trunc(ts) of a stream sorted by timestamp).

Describe the solution you'd like

This epic has a list of optimizations / improvements that further take sortedness into account. Here are some related issues:

@phillipleblanc
Copy link
Contributor

Would this ticket be an appropriate place to add tickets related to pushing down sorts to federated query engines? I know that this was discussed previously (i.e. #7871) and it seems that writing a custom optimizer is the current way to handle that.

I will need to do this soon (federated sort pushdown) and it initially wasn't clear to me how to make this work in DataFusion. I can volunteer to write some docs on how to do this once I have an implementation that works.

@alamb
Copy link
Contributor Author

alamb commented May 1, 2024

Update here: we merged #9593 and now will work on increasing the test coverage to enable it by default (tracked in #10336)

@alamb
Copy link
Contributor Author

alamb commented May 1, 2024

Would this ticket be an appropriate place to add tickets related to pushing down sorts to federated query engines? I know that this was discussed previously (i.e. #7871) and it seems that writing a custom optimizer is the current way to handle that.

I added #7871 to the list above -- thank you.

Yes I think this would be a good place to discuss

I will need to do this soon (federated sort pushdown) and it initially wasn't clear to me how to make this work in DataFusion. I can volunteer to write some docs on how to do this once I have an implementation that works.

That would be great, thanks @phillipleblanc

Right now, once TableProvider::execute gets called, the returned ExecutionPlan can report how it is already sorted.

What we don't have is any way to have the optimizer tell a ExecutionPlan that it could reduce the work required in the DataFusion plan if the data was already sorted.

I wonder if we could add something to ExecutionPlan trait similar to ExecutionPlan::repartitioned like

trait ExecutionPlan {
...
  /// return other possible orders that this ExecutionPlan could return
  /// (the DataFusion optimizer will use this information to potentially push Sorts 
  /// into the Node
  fn pushable_sorts(&self) -> Result<Option<PotentialSortOrders>>> {
    return Ok(None)
  }

  /// return a node like this one except that it its output is sorted according to exprs
 fn resorted(&self) -> Result<Option<Arc<dyn ExecutionPlan>>> {
  return Ok(None)
 }

And then add a new optimizer pass that tries to push sorts into the plan nodes that report they can provide sorted data 🤔

@phillipleblanc
Copy link
Contributor

phillipleblanc commented May 8, 2024

After digging into and understanding how the datafusion-federation crate works, I don't think we need anything additional for sort pushdown. I basically came to the same realization that @backkem had in #7871 (comment).

My realization essentially comes down to (please correct me if this is incorrect):

DataFusion is a library that provides both query planning (LogicalPlan) and query execution (ExecutionPlan). When we are connecting a set of tables from a remote query engine into DataFusion, what we really want is the ability to get an optimized logical plan and send that plan to be executed by the remote query engine - in its entirety, bypassing the query execution of DataFusion as much as possible. (In reality we still want the query execution DataFusion provides for more complex queries that involve custom UDFs, joins between two different remote query engines, etc).

The TableProvider construct is part of the query execution (ExecutionPlan level) machinery of DataFusion, so trying to teach it to be smarter for the query federation case is an anti-pattern in my mind. But we still need a TableProvider to be registered so we can take advantage of the logical planning (via the auto-transformation of a TableProvider to a TableSource in said planning). The datafusion-federation repo solves this by using a thin wrapper around a TableProvider called a FederatedTableProviderAdaptor whose entire job is to provide a TableSource during logical planning. And through a custom FederationQueryPlanner - it recognizes when there are TableScans of a FederatedTableProviderAdaptor and knows to delegate the query execution for the largest LogicalPlan sub-tree that includes only TableScans from the same source to that source (via the deparsing back to SQL).

@alamb
Copy link
Contributor Author

alamb commented May 13, 2024

FYI @NGA-TRAN is working on porting ProgressiveEval to DataFusion: #10488

@Dandandan
Copy link
Contributor

Dandandan commented Dec 4, 2024

I added #13642 and #7053 to the list

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants