-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve RepartitionExec for better query performance #7001
Comments
👍 for this. I poked around with removing the gate in the distribution channel implementation. My challenges with the gate implementation was high lock contention, as well as high memory usage when inputs are highly skewed (e.g. one channel received the bulk of the batches, but the gate still let batches in since other channels were empty). Some experimentation here: https://github.com/GlareDB/arrow-datafusion/blob/repart-perf/datafusion/core/src/physical_plan/repartition/distributor_channels.rs. Note that this hangs on some of the repartition tests, so there's likely a bug in the futures logic. With a high enough channel buffer size, we start to see similar performance increases seen in the flume pr:
(On an M2 Macbook Air) Not trying to push for any one approach, and this was mostly just an experiment. |
I have been dreaming about this -- I may try and bash out a prototype in the coming weeks I find it a very interesting intellectual challenge "can we make a tokio based scheduler build a system that take into account NUMA and cache locality" -- I think we can and this would be an excellent opportunity to show it |
That would be amazing @alamb very interested to see what you come up with :) |
Also @wolffcm noted elsewhere that
I believe this is correct, and perhaps would be something to improve in any improvement |
To add a little bit more context to the above, the |
In my ideal world, the repartition operation would handle effectively keeping all cores busy as well as hashing. I will keep dreaming until I have time |
The current plan to do round robin before hashing seems optimal to me, like @wolffcm I think also suggests? The other way around would reduce the parallelism of hashing. |
I was thinking of a unified roundrobin / hashing repartitioner that was in terms of inputs / outputs. I may have mislead myself in my mind and will have to see how the code plays out |
Some other notes from a DM I am bringing here for visibility: DataFusions pattern can be described as a "classic pull scheduler with volcano style exchange operators":
My theory (which I would say not everyone agrees with yet) is that you can get all the claimed benefits of "morsel driven parallelism" (aka a push scheduler) from a "classic pull scheduler with exchange operators The claimed benefits of morsel driven parallelism of the classic approach are:
Basically I think it is totally reasonable for DataFusion to scale linearly to high core counts -- like 128 / 256. I just need to prove that is the case and I suspect it will take some finagling with the current Repartitioning code |
With the current architecture, I think the easiest thing to do here is probably to not try to have a one-to-one mapping of task to core, and lean into tokio for scheduling to try to get good resource utilization. I did a quick experiment for this by increasing the number of partitions to use in the repartition optimization rule. Currently it gets set to The diff: diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index aa48fd77a..b4ef8c3a2 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -288,7 +288,7 @@ impl PhysicalOptimizerRule for Repartition {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- let target_partitions = config.execution.target_partitions;
+ let target_partitions = config.execution.target_partitions * 3;
let enabled = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size; And some benchmarks using TPCH SF=10 Macbook air m2:
GCP n1 vm (8 cores/16 threads, 60G memory):
|
@scsmithr this is great data -- thank you -- and it does imply there is substantial headroom for performance improvement by using more cores. t is interesting that some queries go 2x as fast -- it is almost like the default value for target_partitions was 1/2 the actual available cores However, the config seems to be doing the right thing: 🤔 |
That's very interesting, do you have an idea what may be going on @alamb? |
Much of the improvement here is from repartitioning in the ParquetExec. Since that's io bound, having more tasks than cores improves resource utilization if that's what the rest of query is waiting on. It might be reasonable to introduce a |
Makes sense 👍 |
@scsmithr how did you get the numbers you report in #7001 (comment) ? I believe the benchmarks default to using only 2 cores: https://github.com/apache/arrow-datafusion/blob/ed85abbb878ef3d60e43797376cb9a40955cd89a/benchmarks/src/options.rs#L29-L31 So if you ran the tests without specifying I wonder if we should increase the default number of partitions to the core count in the benchmarks 🤔 |
Yep completely missed that option. I was running I'm coming at this with trying to speed up reading unpartitioned parquet files from GCS, and there's improvements when setting target partitions beyond number of cores. The above benchmark numbers were me trying to see if that applied generally, and not just in this specific case. Turns out, it doesn't apply generally... still investigating why I'm seeing speed ups with GCS.
Would making |
I think it would make sense as it better reflects the default configuration DataFusion is run in. I wonder if the benchmarks originally were trying to isolate the specific algorithms and avoid the additional noise that multi-cores brought. However, given that they run with 2 cores now it seems they get the worst of both worlds @scsmithr do you have time to make a PR with that change? |
I'm currently running some experiments with this ( |
Doing experiments in #13699 if anyone is interested, results seem to look very promising (need to double check). |
This is for TPC-H SF=1:
|
It seems the improvement comes not from the adaptivity but from improved balancing, so I'm probably opening a simplified PR with similar improvement. |
Summary
The key to using multiple cores efficiently is related to query plan is paralleized. In DataFusion this often involves repartitioning data between partitions. This ticket describes ideas for improving the
RepartitionExec
operator so that DataFusion's performance scales more linearly with the number of cores.Experiments @YjyJeff in #6928 show that some part of
RepartitionExec
can be the bottleneck in certain queries. While we likely can't use the unbounded buffering approach of #6929 for the reasons @crepererum explains in #6929 (comment), there is clearly room to improve. Also, we have some earlier evidence from @andygrove in #5999 that scaling with core counts is a challenge.Background
DataFusion (and most other commercial systems) uses the "Exchange Operator" 1 based approach to parallelism. I believe the exchange approach is so commonly implemented because it is simple to reason about, and works quite well in practice with sufficient engineering care.
DataFusion's planner picks the target number of partitions and then
RepartionExec
redistributesRecordBatches
to that number of output partitions.For example, if DataFusion has
target_partitions=3
(trying to us 3 cores) but scanning an input with 2 partitions, it makes a plan like this:Note there There are alternative approaches, such as described in the Morsel-Driven Parallelism Paper, but other than the notable exception of DuckDB I don't know of any widely used engine that takes this approach. The major benefit to the morsel driven approach that I understand is that it can, in theory, respond better to dynamic resource changes (e.g. throttle down to use 2 cores when a high priority job comes in and then go back up to 8 cores when done).2
The engineering challenge of the
RepartitionExec
is twofold:Synchronization and Stalls
Given
RepartitionExec
is a synchronization point between multiple threads, without careful engineering the lock contention can become the bottleneck.Also, without careful scheduling the consumers may "stall" waiting for data from a producer.
NUMA
Non Uniform Memory Access (NUMA) basically means that data that is "closer" to a core is faster to access. In practice what this means is that optimal performance is achieved when the same thread /core that produces data (e.g. decodes a
RecordBatch
from parquet), also then process it until the next pipeline breaking operation (e..g update the hash table). If one core produces theRecordBatch
and another consumes it, additional memory latency is incurred.Since the
RepartitionExec
is designed to shuffle data, it is very easy to destroy NUMA locality if care is not taken.I believe the current RoundRobin approach is not NUMA friendly. It will very likely decode some parquet data, put that batch in a queue, and then go and decode more parquet data (rather than return control to the operator that was waiting for it)
Idea 1: Buffering to the
RepartitionExec
One idea, suggested by @Dandandan and @ozankabak in #6929 (comment) is to introduce some more (but not unbounded) buffering into the Repartition operator
For example, perhaps we could extend the existing DistributionSender to have a queue of
RecordBatch
es (2 or 3 for example) rather than just a singleOption<>
so that it was possible to start fetching the next input immediatelyhttps://github.com/apache/arrow-datafusion/blob/d316702722e6c301fdb23a9698f7ec415ef548e9/datafusion/core/src/physical_plan/repartition/distributor_channels.rs#L180-L182
Idea 2: Make RoundRobin more adaptive
Another idea would be to restructure the "RoundRobin" repartition strategy to be more adaptive. Currently each input evenly sends RecordBatches to each output partition and will return control via
tx.send(Some(Ok(batch))).await.is_err()
if any output partition is full.We could potentially improve this logic rather than giving up control by attempting to find an output partition that is empty and filling it (aka
tx.try_send
and if no space, try some other partitions)Footnotes
This model was first described in the 1989 paper Encapsulation of parallelism in the Volcano query processing system Paper which uses the term "Exchange" for the concept of repartitioning data across threads. ↩
We actually had a version of this approach as an experimental feature in DataFusion for a while but removed it as we found it didn't offer compelling enough performance improvement to justify the engineering effort. ↩
The text was updated successfully, but these errors were encountered: