-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed scheduler does not obey dask.order.order for num_workers=1, num_threads=1 #5555
Comments
Thank you for your report @davidhao3300. This is indeed a known problem that is being introduced by us assigning tasks to workers only once we know for sure that they can be computed. That introduces a problem we typically refer to as "root task overproduction" and causes a non-optimal We've had reports in various contexts already and they typically all require us to expand our scheduler task assignment logic. See also #3974 for a lower level description and links to other tickets. Another problem I can see in your example is that I would expect these computation graphs to be fused (i.e. linear chains should be merged such that there is only a single task). cc @gjoseph92 you've been looking into something like that recently, haven't you? |
Thanks for the explanation, the root cause makes sense to me, and I can look into manually fusing as a workaround for now |
Thanks @davidhao3300. I really appreciate having such a well-written issue open for this, because this is a known, widespread, and significant problem as Florian has said. "dask.order is not obeyed" is a good way to put it—ordering is actually mostly not considered by the scheduler, which is quite confusing and misleading to users. I suggest reading #5223 as well for some more background on the problem (#3974 is one possible solution, though not the only one). Florian is correct that you'd expect this graph to be fused. Linear fusion is really the only bulwark we have against this problem right now. Interestingly, your graph isn't getting fused because you're using |
It's good to know I accidentally uncovered a separate issue with partition_info haha; the actual code doesn't use partition_info. Our actual graph is more complicated, but it's good to know that linear fusion should happen automatically. For automatic graph optimization, is this calling fuse_linear() or fuse()? We have more complicated operations chains in the actual code, and those don't appear to fuse together from what I can tell, need to play around some more |
@davidhao3300 if you're using DataFrames, only Blockwise fusion happens by default now (dask/dask#7620), which is neither This change was made because most (but still not all) DataFrame optimizations use Blockwise/HighLevelGraphs. The upside of it is that HighLevelGraphs get sent to the scheduler, which are much smaller and more efficient. But, if there are any layers in the HighLevelGraph that aren't Blockwise, then those won't get fused anymore. That's exactly what you saw in your example: Short story is if you do You say your graph is more complicated—this is also especially important if you're using any non-DataFrame collections, such as Delayed. If you're going in and out of Delayed, but ending up with a DataFrame, you'll definitely want to set that |
Thank you for the detailed response, I think we have enough info to proceed forwards with optimizing our own graphs, I'll let you know if anything else weird pops up! |
I was looking into this briefly. I think that it's a bit simpler than what folks are describing. So, it's true that no parallel dask scheduler completely obeys dask.order. Parallelism gets in the way. However, it does come pretty close. In this particular case what happens is that ...
And so when you look at this problem with only two partitions it looks bad, but if you zoom out to a more realistic situation then it looks fine. Here is such a case. I've modified the example above to have fifty rather than two partitions: import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask
from dask.distributed import Client
# Write data to parquet as a setup, to demonstrate unnecessary memory usage
df = pd.DataFrame(np.random.randn(100,100), columns=[str(x) for x in range(100)])
# Can change npartitions to 10 to show that this problem becomes severe at higher partition count
ddf = dd.from_pandas(df, npartitions=50)
ddf.to_parquet("input")
# You may need to set host and dashboard_address to expose dashboard
# We set n_workers=1 and threads_per_worker=1 to get rid of concurrency for simplicity
client=Client(n_workers=1, threads_per_worker=1)
ddf = dd.read_parquet("input")
def f(df, partition_info=None):
# Pause execution on second partition
import time
time.sleep(1)
return df
ddf = ddf.map_partitions(f, meta=ddf)
ddf = ddf.to_parquet('test', compute=False)
# Run this to show that the dask.order.order output is reasonable
# ddf.visualize(color="order")
# This will hang (on purpose). Go to dashboard graph view to see the graph screenshotted in this issue.
dask.compute(ddf, scheduler=client) We can see that, at least for the instant that this image was taken, there were no lingering pieces of data from the first column. Probably there was one for a few milliseconds though and I just didn't catch it. Anyway, at least for this example Dask seems to be working as expected. You should make partitions small enough so that you can fit around Dask scheduling can certainly be improved. The issue where we have an extra partition in memory can be significant when people are running with partitions that are large relative to the size of their memory. Topics like more aggressive fusion, or speculative task assignment can resolve these problems in the medium term. Short term following the rule of "make partitions small enough so that you can fit |
Thank you, based on the conversation we had, it's possible that "longer" chains exacerbate the suboptimal scheduling issue described earlier, or that this problem has been mostly alleviated since we first had the issue 1-2 years ago. I will report back with findings later this week. |
That could be true, yes. Mostly I think I'm acknowledging the problem you've raised, but rejecting the specific example (while commending you for taking the time to make a minimal example). If you can help to construct a better example then that would be welcome. |
FYI, in the latest release of distributed (2022.11.0), this problem should be mostly alleviated. Thanks to the new "queuing" scheduling policy that's on by default, you should no longer see root task overproduction like this. @davidhao3300 I've updated your reproducer slightly, since dask/dask#8309 has been fixed (so import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask
from dask.distributed import Client
if __name__ == "__main__":
# Write data to parquet as a setup, to demonstrate unnecessary memory usage
df = pd.DataFrame(np.random.randn(100, 100), columns=[str(x) for x in range(100)])
# Can change npartitions to 10 to show that this problem becomes severe at higher partition count
ddf = dd.from_pandas(df, npartitions=10)
ddf.to_parquet("input")
dask.config.set({"distributed.scheduler.worker-saturation": 1.0})
# You may need to set host and dashboard_address to expose dashboard
# We set n_workers=1 and threads_per_worker=1 to get rid of concurrency for simplicity
client = Client(n_workers=1, threads_per_worker=1)
ddf = dd.read_parquet("input")
def f(df, partition_info=None):
# Pause execution on second partition
if partition_info["number"] == 1:
import time
while True:
time.sleep(5)
return df
ddf = ddf.map_partitions(f, meta=ddf)
ddf = ddf.to_parquet("test", compute=False)
# Run this to show that the dask.order.order output is reasonable
# ddf.visualize(color="order")
# This will hang (on purpose). Go to dashboard graph view to see the graph screenshotted in this issue.
input("Press enter to start")
dask.compute(ddf, scheduler=client, optimize_graph=False) Before (
|
One interesting little thing is your exact reproducer in this issue isn't fixed because of odd edge cases in the scheduling heuristic right now:
With only 2 partitions at But if you add any more partitions, suddenly it behaves as you'd expect (execution breadth of 1): The entire lower chain is blue (complete). We completed that chain before moving onto the one above. Whereas in the first figure, we're trying to do both chains at once and swapping between them. This hysteresis happens because the 'new and improved' scheduling mode only kicks in when you have distributed/distributed/scheduler.py Lines 3034 to 3036 in 176a84c
Though this edge case hopefully wouldn't come up too much in normal use (usually you'd have a lot more tasks than threads?), it could still be confusing, especially if trying to diagnose things with small examples like this. |
What happened:
Distributed scheduler appears to be ignoring the priority set by dask.order.order and instead does a bread-first execution order. This problem is particularly prominent when doing simple per-partition operation chains, where we are able to read one input partition, do some work, then write the results to a partition.
The end result for this particular usecase is that we load ALL parquet partitions first, apply the function to each one, then write each one to file.
While the issue is about num_workers=1, num_threads=1, I do think the suboptimal scheduling occurs in situations involving concurrency as well, and hopefully addressing the serial case will help with the concurrent case.
dask.order.order execution order:
See how we process the left chain as much as possible before moving to the right chain, as expected
Actual execution:
I made the function pause in the middle of execution on the top green node. Note how both chains are in-memory. I would've expected to see the bottom chain either completely grayed out (not started), or that the bottom-right node be the only thing in memory.
What you expected to happen:
We instead should be doing depth-first execution order like dask.order.order suggests.
Minimal Complete Verifiable Example:
Anything else we need to know?:
Environment:
The text was updated successfully, but these errors were encountered: