-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Documentation on best practices configuration for large numbers of workers #5164
Comments
First of all, I don't think we have such a section. We are trying to aim for sensible default parameters or implement auto scaling solutions in case this is not possible. That said, I wouldn't be surprised to hear that this is not working perfectly. Briefly browsing over the code, I would anticipate the scheduler-info update on client side to be the source of your trouble. This is configured via the option
I'm not opposed to something like this but the question is what to put in. if it's just about the scheduler-info interval above, I'd prefer that we implement this to be adapted to cluster size automatically. If there are many of these tricks, some of which cannot do much about, that section would make a lot of sense cc @jacobtomlinson and @jsignell since they have been putting a lot of thought into structuring of docs lately. Maybe there is a perfect place where you can add your experiences to :) BTW: 1.5k workers is definitely one of largest setups I am aware of. If there are any other areas where you'd expect dask to scale better or be more stable, I'd welcome your thoughts. Feel free to open another ticket for a discussion if there is anything to share |
Thanks so much @fjetter ! I will give that a try next time I run this workflow (probably in a couple days) and report back. I agree it sounds nicer to just implement logical defaults/auto-scaling parameters and avoid having to describe different settings to users. So if there's a way to auto-scale this particular parameter (and if this is indeed the cause of the notebook slowdown) than that seems fine and doesn't require a full documentation section. Maybe just count me as one user who has enough experience to think that a problem like this shoudl be solvable, but not enough to be able to figure out exactly what config parameters to tweak to get there. I'm not sure how many folks there are that fall into tha bucket, but for someone like me, docs could be helpful. That being said, I don't have much knowledge to contribute other than that which comes from periodically running into something that doesn't work smoothly and then asking about it... Overall, I think this large worker setup runs surprisingly well. The slow and choppy communication is the main issue. The rest of the issues I often run into probably stem more from our kubernetes backend running on preemptible google cloud nodes.
|
Feedback like this is always appreciated and I can only encourage you to open tickets in case you are spending a lot of time working around limitations. We often lack the necessary feedback in these situations since there are not many users running on that scale. You are not the only one and the user base continues to grow on that front but that makes it even more important for us to get the feedback to preemptively fix these issues. Below a few inline comments to your feedback.
If that parameter turns out to be a problem we could do the same thing as for the worker hearbeat. that's easy, we would like to have confirmation before we do anything like this, though.
That's likely one of the topics we would cover in the "large deployment" section since this is something hard to auto-configure. that also depends on the kind of workloads and the deployment you're running (are you susceptible to out of memory? are you running on spot instances? etc.)
This configuration needs to be set before the scheduler is instantiated. I would probably open a ticket over in dask-gateway and ask if this is the "best approach" or if there are other options. I'm not too familiar with dask-gateway myself.
Another paramter which is increased often by heavy users is the
I would expect this to eventually run into a timeout if workers disconnect. For extremely large deployments, I'm not sure if scatter is robust or efficient. By default, scatter will open a connection to every worker and copy the data. For 1.5k workers that's a lot of work for your client. Instead, what it should try doing is to put the data on a few workers and start a tree replication from there. We're working on improving the replication mechanics with Active Memory Management (AMM; see #4982 and #5111 ) Until then, not scattering and letting the workers figure it out might actually be faster. Optimal case is probably if you only scatter to a few workers (10? 100?) and let distributed figure out the rest. even if every job requires the data, the ordinary machinery should replicate this data everywhere eventually.
We are having issues with actual deadlocks recently due to some low level race conditions int he task state machine. It's hard to tell if your situation is one of those. There have been a few fixes in the latest release. If you are not, yet, on the latest version, I can encourage upgrading. More fixes are planned to help with these things
There is no such thing :) We've had some major improvements for the scheduling of such workloads in the recent past, see #4967 (there are a few follow up PRs with a few fixes) which should make these things run much more smoothly.
The optimization in particular is probably the reason why your jobs run OOM on client side and take an insane amount of time being scheduled (and graph sorting, we cannot disable this atm). You might want to disable this entirely. That will likely cause more tasks to appear but maybe overall you're better off. What version of |
Regarding docs - I think this would belong in a short form "How do I..." or "FAQ" section. We don't necessarily have one of those yet, but the goal would be to put it somewhere where it can be easily indexed by search engines and people will find it. |
@fjetter This is incredibly helpful and a reminder of how responsive and engaged the dask community is! Also a reminder for me to be a bit more proactive when I'm running into issues rather than simply settling for a hack and carrying on.
I have been trying to keep up with the (very fast) speed of development so am currently running 2021.7.2 but I can't say that all of these issues have cropped up when using this version. It's possible some of these have been solved since the most recent 1 or 2 updates or it's possible I just haven't hit them recently. If/when I hit them again, I'll post specific issues with the version included
I'll keep you posted on this thread next time I run this workflow.
Agreed. Since I am potentially susceptible to running out of memory, my standard approach has been to run with Speaking of this issue, another thing I've noticed with the recent notes on trimming unmanaged memory is that while the "debugging approach to invoking
Will do.
Anecdotally, I have noticed fewer of those issues lately. I'll keep an eye out and if I do see them will try bumping this parameter up
Makes sense and honestly I'm usually able to figure out a way to not use scatter at all and just load data within every job. Most often the re-loading of this data with each job is worth the stability of not dealing with a future of a scattered object and the potential issues in passing this data across the network
I think i've experienced this on the latest release, but perhaps it was 2021.7.1. Will raise an issue again if I encounter this.
This is great! Will keep improvements like this in mind and continue to test how these sorts of workflows are running
I read this blog post about new strategies for scheduler-side optimization and have experimented with their suggestion of trying setting the optimization.fuse.active parameter to false but haven't noticed too much difference in this experience. Look forward to these updates propagating to xarray! @jsignell if it would be helpful, I'm happy to start compiling this list of experiences into something more cohesive that could go in something like a "How do I...utilize a very large cluster" section. I don't have the bandwidth this week, but could get started on that soon if you think having a section like that would be useful. |
That sounds useful to me if/when you have the time. It would probably also make a good blog post if that appeals to you. |
Yeah I could probably scrounge together something like that! Would you be the right person to stay in touch with about how to structure that? |
We're still collecting experience ourselves for this mechanism. In the end, we're relying on the libc implementation you have running so there may be differences depending on what your OS is currently using. We can update the documentation but the question is what to put in there. We're, more or less, simply referencing the official malloc / mallopt documentation
My point is rather that you should not necessarily worry about this data replication yourself. If you just create one task which loads this data, dask will implicitly replicate it to the workers which need this data. The network load for this replication is spread evenly on all workers who have the data at a given point in time. If you scatter the data, you are pushing this data from your client to all workers yourself, i.e. your client maintains a connection to all 1.5k workers and pushes the data. |
Sure you can ping me, but you can also open a PR anytime at https://github.com/dask/dask-blog |
@fjetter Took me awhile to get back to this but this advice was great! Using 30s for this parameter, I've been running a notebook that contains a client connected to a ~2000 worker cluster with basically no slowdowns in executing cells in the notebook. Still trying to carve out some time for a blog post on these general best practices... |
I don't think this is an existing issue or anywhere in the docs, but my apologies if it is already mentioned somewhere. I occasionally run jobs on 1500+ worker clusters (using dask-gateway with the Kubernetes backend) from an interactive jupyterlab interface. When I do this, the notebook response time slows down considerably and I'm assuming this has to do with some extra communication thats occurring between client and scheduler due to the extra worker(?) If that's true, I can see a number of configuration parameters that might improve this situation by slowing down communication frequency, but I'm not exactly positive which ones to alter. I'm wondering if it could be useful to have a "best practices for creating clusters with many workers" section of the docs which details some tips and tricks for workflows on large clusters. It's also possible there is no easy answer to this question, in which case feel free to close this feature request.
The text was updated successfully, but these errors were encountered: