Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: More Granular File Operators #2079

Closed
tustvold opened this issue Mar 24, 2022 · 11 comments
Closed

RFC: More Granular File Operators #2079

tustvold opened this issue Mar 24, 2022 · 11 comments
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently the file scan operators such as ParquetExec, CsvExec, etc... are created with a FileScanConfig, which internally contains a list of PartitionedFile. These PartitionedFile are provided grouped together in "file groups". For each of these groups, the operators expose a DataFusion partition which will scan these files sequentially.

Whilst this works, I'm getting a little bit concerned we are ending up with quite a lot of complexity within each of the individual, file-format specific operators:

This in turn comes with some downsides:

  • Code duplication between operators for different formats with potential for feature and functionality divergence
  • The operators are getting very large and quite hard to reason about
  • Execution details are hidden from the physical plan, potentially limiting parallelism, optimisation, introspection, etc...
  • Catalog details, such as the partitioning scheme, leak into the physical operators themselves

Describe the solution you'd like

It isn't a fully formed proposal, but I wonder if instead of continuing to extend the individual file format operators we might instead compose together simpler physical operators within the query plan. Specifically I wonder if we might make it so that the ParquetExec, CsvExec operators handle a single file, and the plan stage within TableProvider::scan instead constructs a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter operators as necessary.

For what it is worth, IOx uses a similar approach https://github.com/influxdata/influxdb_iox/blob/main/query/src/provider.rs#L282 and it works quite well.

Describe alternatives you've considered

The current approach could remain

Additional context

I'm trying to take a more holistic view on what the parquet interface upstream should look like, which is somewhat related to this apache/arrow-rs#1473

FYI @rdettai @yjshen @alamb

@tustvold tustvold added the enhancement New feature or request label Mar 24, 2022
@alamb
Copy link
Contributor

alamb commented Mar 24, 2022

Specifically I wonder if we might make it so that the ParquetExec, CsvExec operators handle a single file, and the plan stage within TableProvider::scan instead constructs a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter operators as necessary.

I like this idea (though I am somewhat biased, as I like / implemented a bunch of the IOx approach) :)

@yjshen
Copy link
Member

yjshen commented Mar 25, 2022

Thanks for bringing it up.

Recently, we've encountered several different circumstances to deal with query execution parallelism:

I think it might be the right time to rethink how to divide query working sets, how to partition/repartition data among operators, and how we should schedule tasks.

My opinion on this whole scheduler and execution framework is: (partly proposed in #1987 (comment) and in our roadmap)

  • Adopt stage-based execution in DataFusion core.

    • Divide query execution plan based on "exchange" operators or "pipeline breakers" into a DAG of stages.
    • For each stage, group all operators into a Task. Processing data with operator logic serially, synced, pipelined inside each Task.
    • For the root stages that read data from files directly, partition input dataset based on a per-task size configuration. (similar to that of input.split.maxSize for MapReduce and similars for Spark/Presto).
    • For non-root stages, we could either adopt a fixed num_partition or determine the number of partitions based on runtime generated size.
  • A shared scheduler framework for both DataFusion and Ballista.

    • Schedule tasks based on stages dependency and schedule tasks based on available cores.
  • Ultimately, a finer-grained execution for DataFusion core, as described in Morsel-driven parallelism and Push-pull.

By the method proposed above, we could also achieve:

limit concurrently (don't scan 20 parquet files in parallel if you have only two cores)

with the help of the TaskScheduler, and achieve:

can instead construct a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter

by an existing, sequentially executed, Task.

cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 25, 2022

I agree entirely that the current scheduling within DataFusion, which effectively punts onto tokio and hopes it does something vaguely appropriate, is likely sub-optimal. In fact one of the issues I'm having with #1617 appears to be that sub-optimal task scheduling is causing a 2x performance regression.

That being said, I think there are a number of problems here and it would be advantageous in my mind to keep them separate:

  • Translation of a LogicalPlan::TableScan into a corresponding ExecutionPlan
  • Optimisation of this ExecutionPlan to potentially introduce additional parallelism
  • Scheduling of this ExecutionPlan to actually perform its computation

This ticket is then concerned solely with the first of these, and reducing the complexity of the file-format specific operators necessary to achieve this translation. In particular removing the ability to scan multiple files within a single operator, and instead composing multiple per-file operators in the generated ExecutionPlan. I think the remaining two problems can and should be kept separate, in part because they have broader scope than just scanning files.

Does this make sense, or am I missing something?

FWIW we sort of already do the stage-based execution you describe, but it is implicit based on whether operators call tokio::spawn or just compose the SendableRecordBatchStream together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of ExecutionPlan to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?

@yjshen
Copy link
Member

yjshen commented Mar 30, 2022

Translation of a LogicalPlan::TableScan into a corresponding ExecutionPlan

Sounds great. We could eliminate much of the common logic scattered in different file formats, as you mentioned. And yes, this makes sense!

removing the ability to scan multiple files within a single operator, and instead composing multiple per-file operators in the generated ExecutionPlan.

I'm confused here: How would we parallelize the physical operators after this TableScanOp? And how do we control this single operators parallelism? Will the data batch be sent through a multi-producer(the single ops)-multi-consumer(like filter/project/sort) queue?

compose the SendableRecordBatchStream together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of ExecutionPlan to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?

What do you think if we avoid async and stream from normal operators' execute()? just like you and @alamb mentioned merging project/filter logic into scan operator, these pure computations are async free in my opinion, they are data or computation-intensive and do not talk with IO systems. What do you think if we have (similar to that of Morsel-driven):

image

@tustvold
Copy link
Contributor Author

tustvold commented Mar 30, 2022

How would we parallelize the physical operators after this TableScanOp?

You would have multiple single-partition ParquetExec being fed into a single UnionExec, potentially with some ProjectionExec, etc... in between. As each Datafusion partition gets its own tokio task => parallelism. If you wanted parallelism within a single file, you would have an optimizer pass that would replace the single ParquetExec with multiple with disjoint row groups, again this would be fed into a UnionExec.

What do you think if we avoid async and stream from normal operators' execute()?

Let me get back to you on this, it is something I am currently mulling about and experimenting with. I agree that using async for CPU-bound work seems a little wonky, but as @alamb articulated here there are reasons that it may be the pragmatic choice. I'm trying to collect some data so we can make an informed decision 😅

FWIW as you link to the morsel driven paper - what you describe I think is closer to the more traditional plan-driven parallelism than morsel-driven parallelism. Tokio is much closer to that paper than what you describe as it incorporates notions of dynamic scheduling and work-stealing, rayon may be even closer

@yjshen
Copy link
Member

yjshen commented Mar 30, 2022

it is something I am currently mulling about and experimenting with. I agree that using async for CPU-bound work seems a little wonky, but as @alamb articulated here there are reasons that it may be the pragmatic choice. I'm trying to collect some data so we can make an informed decision 😅

Very much looking forward to it.

you describe I think is closer to the more traditional plan-driven parallelism than morsel-driven parallelism. Tokio is much closer to that paper than what you describe as it incorporates notions of dynamic scheduling and work-stealing, rayon may be even closer

I think work-stealing in Morsel-driven and that in Tokio are quite different things. Having a rough partition of the whole dataset at the beginning, and stealing part of data from the skewed partition to idle working slots or CPU cores later is quite different from task/green thread stealing for Tokio. Or do I miss something crucial that one SendableRecordBatchStream can be parallel processed by multiple tokio tasks? 🤔

@tustvold
Copy link
Contributor Author

Or do I miss something crucial that one SendableRecordBatchStream can be parallel processed by multiple tokio tasks

Depends, the SendableRecordBatchStream itself can only be processed by a single tokio task correct, however, there is nothing to prevent that stream from actually being an mpsc channel with the actual work performed in other tasks in parallel. In fact this is exactly what CoalescePartitionsExec does, and the physical optimizer will add combinations of RepartitionExec and CoalescePartitionsExec to plans based on the target_partitions setting.

Whilst target_partitions is typically set to the CPU thread count, RepartitionExec will typically appear multiple times in a given plan, and so this will result in more tasks than CPU cores. If there are other queries running concurrently, or the target_partitions is set higher, this will be even more pronounced. If you now squint, this is a first-order approximation of morsel-driven. It's far from perfect, the tokio scheduler is not in anyway NUMA-aware and in fact it optimises for load-distribution at the expense of thread-locality, but it is not hugely dissimilar.

At least that's my hand-wavy argument 😆 I happen to think rayon is closer in spirit, but I'm not sure how much of a difference that will make in practice.

@tustvold
Copy link
Contributor Author

As promised, the scheduling proposal can be found in #2199

@tustvold
Copy link
Contributor Author

Ok I've created some sub-tasks that I think should provide a path towards implementing this proposal:

I'm currently somewhat over-subscribed getting #2199 over the line, so if someone was free and willing to help out that would be awesome. If not I'll get to them when I have a chance.

@matthewmturner
Copy link
Contributor

@tustvold thank you for driving this. i do hope to getting around to help on this but i have been a bit short of time lately. will keep you posted if im able to pick any of this up.

@tustvold
Copy link
Contributor Author

tustvold commented May 8, 2022

I think this work is now encapsulated in smaller sub-tasks, so closing this issue

@tustvold tustvold closed this as completed May 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants