-
-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rust implementation of dask-scheduler #3139
Comments
I forgot to note, that distributed version 2.3.2 was used. |
This is very exciting. I'm glad that you and @Kobzol are working on this. We use GitHub issues for all sorts of communication, so I think that this is the ideal place to have this conversation. Some thoughts / initial reactions below:
|
Thank you for your response. We were originally thinking about separating scheduler from the dask runtime as a first step. But we did not find an easy way how to integrate an asynchronous scheduler, i.e. that task placement decisions may be done later or before inputs tasks are finished. |
I think that it would still be synchronous. The scheduler logic would be in rust. This would receive a sequence of events from the Python side. Based on the events that it received it would change its internal state and emit a sequence of messages for the Python side to send out. |
In the long run, we want to use a more general solution, i.e. an asynchronous scheduler. You're right that we could also create a synchronous scheduler in Rust and call it from the Python code, but we see two obstacles with this approach:
So there are two (separate) directions:
|
That's correct today. But that could also be changed.
Yes. I was surprised by this. My hope is that this is some performance regression, and can be resolved on the Python side. Most of our code is independent of the number of workers, at least in theory. We should do more profiling at scale here.
Agreed. Just to make sure that we're on the same page though, you shouldn't think of Dask as a work-stealing system. Work-stealing is a small part of Dask's scheduling. Most of our decision making has to do with determining earliest finish time quickly. Most of the scheduler is about maintaining data structures with information about expected task times, worker loads, bandwidths, data sizes, and so on, so that we can make the earliest finish time query in constant time. I agree with the two directions that you lay out. |
Indeed, it's just that we also wanted to test if writing the central part in Rust helps the performance by itself, so we started with that and now it seems to us that it will be less effort to test various scheduling algorithms in Rust combined directly with the Rust server.
Thanks for the clarification. Since all the statistics gathering (and also the work-stealing) is embedded very deeply into the current Python server and it generates a lot of data traffic and also eats CPU time, it is unclear if we can directly compare the two solutions and say that the difference between Rust and Python is X. For that we would need to reimplement the exact same heuristics, statistics and work-stealing of the Python solution in Rust, and that is both a lot of work and not our goal now. Our initial benchmark confirmed that there is seemingly a non-trivial difference between the two solutions (even though we do a lot less work). If using Rust, a very simple heuristic and a task graph with a trivial structure would result in a 5 % improvement, it would not make any sense to continue. We will try to benchmark both our server and the Python server to find out what causes the difference and also experiment with different scheduling algorithms on more complex task graphs. It would be great if you could point us to some benchmarks that represent real user task graphs to test our solution (and add missing features step-by-step to execute these graphs). Originally we wanted to pass the Dask scheduler test suite, but we noticed that there is a lot of additional logic in the scheduler. Right now we mostly want to find a performance baseline and compare scheduling heuristics, if we find a good solution and the Rust version will be significantly faster, it might make sense to add these additional features in the future, but it is not a priority for us at this moment. |
OK, fair enough. However, I do encourage you to learn from the existing statistics and heuristics. They have been critical to adopting users. Many more people are concerned about making sure that we can run graphs in a good order than that we can run many tasks quickly. Optimizing strictly for task throughput may not have much impact on real-world problems.
I would use dask.dataframe/array/xarray to generate these graphs. Groupby-aggregations, joins, matrix multiplication, and so on. These should generate plenty of moderately complex task graphs. You might also look through examples.dask.org. I'm curious, what additional features are you talking about? If you're just thinking about scheduling task graphs then it's kind of all-or-nothing. I agree that you probably shouldn't think about Actors, Queues, or Pub/Sub. |
That is good to know. We have been using Dask mainly on HPC architectures for running large scale experiments, training many ML models with cross-validation and similar things. I imagine that low latency might be worthwhile if you construct the task graph in a very dynamic way, but is there also a use case where latency matters if you just build the graph and then you wait for the final result? Or in general, could you point us to some use cases for Dask where latency matters so that we have a better idea about this? We focus mainly on throughput right now, but we want to design the Rust server in such a way that changing the scheduling heuristics would be very simple (for example to use a heuristic that tries to minimize latency), by separating the scheduling and data movement layers.
We have started with the simple task graph above, ran it repeatedly and gradually implemented support for a minimal set of Dask messages needed to fully execute this task graph, plus some additional error handling. More complicated task graphs may fail right now, if they use any additional messages. Things like compression, the mentioned actors/queues/pubsub, statistics gathering, etc. are missing and I'm sure that there are many other metadata being collected by the Python scheduler right now that we don't use nor gather. To be honest we don't fully know the whole interface of the scheduler (all of the messages it can receive and send and all the possible message frame variants), so we decided to start by designing a simple use case and making it work. |
Hi @mrocklin! We have tried to extend the Rust implementation to support more advanced use cases like distributed pandas or numpy computations, but we have hit a problem with the Dask low-level serialization protocol (https://github.com/dask/distributed/blob/master/distributed/protocol/core.py#L22). For complicated task graphs the protocol creates messages which are quite complex. Individual key/values in the main message might be missing and they need to be "patched in" via a series of (dynamic accessor, header pointing to frame(s)) structures that are stored in a Instead of storing accessors, we build the actual message structure with the final keys, but replace the serialized payloads with a header which then points into the additional frames. As the message is received, we recursively traverse the message and replace the placeholders with the actual data from the frames. So instead of
we do
and during deserialization we read the header and replace it with the content of the corresponding frame (and deserialize it if necessary). The main benefit for statically typed languages with this approach is that we can simply replace the inner content of an individual key, but we don't have to manipulate the overall structure (insert a value into the zeroth element of the first element of a key 'foo' etc.). With this approach you have to keep all the frames in memory, but that was also the case in the previous version. Headers are not de-duplicated in the current version, but that could be changed easily by adding another meta frame. I tried to make as few changes to Dask as possible to keep the implementation compatible with all the features it currently supports. I ran the test suite and it seems to pass with the change. Are there any memory/performance benchmarks that we could run out of the box to spot any potential performance regressions? Our goal now is to find out if we can simplify the serialization/protocol structure to make it easier to implement the scheduler in a statically typed language. We can put up a PR with our change or with additional changes if you think that this is a good approach. Otherwise we would probably create some private patch to distributed and work on top of that. What do you think about our change? We are also unsure about some additional things:
|
I apologise for the delay in response. It has been a busy month. Answers to specific questions:
I believe that the original motivation here was because some compression libraries wouldn't accept frames larger than 2GB. I don't remember perfectly though.
I don't recall
Python includes a variety of kinds of bytestrings, including bytes, bytearrays, and memoryviews. They have different uses in various libraries, but for our purposes are all valid. I'm not sure what you're asking about with Serialize and Serialized. These are necessary, for example, to refer to messages that we've chosen not to deserialize, but want to pass on. This is common on the scheudler which intentionally never unpacks user data.
This appears to be a list of small messages. Dask batches small messages on high-volume connections. See the batched.py file for more information.
I'm sorry, I haven't yet had time to dive into this.
I'm not sure I fully understand what you mean by "patched in" and accessors here. However, I'm not surprised to learn that the way(s) that we represent task graphs is overly complex. It has grown organically over time. I recommend raising a new issue where we discuss the current system in Python on its own and see what we can do to clean it up. We can probably involve some other maintainers in that discussion that have more time than I do. My apologies again for my late response here. |
Thanks for your response :) Just to make sure that we understood the current system - the "patching in" - correctly, I want to make an example, please feel free to correct me if I state something wrongly. At While this is pretty elegant in Python, it's frustrating to implement something like this in a statically typed language (especially in Rust, which has a pretty strict type system), and even more so if we want to have proper statically typed messages and not just opaque blobs containing arbitrary dictionary/list data. Because of this we suggested a different approach, where the structure of the message would be fixed, but only the leaf attributes which were moved into a frame would be replaced by a placeholder. |
Hi, we have some benchmark results to share. Each column is a single cluster (scheduler-number of worker nodes-number of workers per node). Each Dask worker was executed with The Dask version that we use is from this branch: https://github.com/Kobzol/distributed/tree/simple-frame. It uses a slightly different frame encoding, as discussed here before. We have also benchmarked this modified Dask with unmodified Dask and there don't seem to be differences. We used 3 different PBS allocations to run all the benchmarks, so it's possible that the network connections between the nodes were slightly different in each allocation, but in practice we hadn't seen large effects caused by this fact. All benchmarks were executed 5 times (the Dask cluster was started, then the benchmark executed, then the cluster was killed 5 times in a row to avoid any old stuff lingering in the cluster). We will probably relax this for future benchmarks and run all X iterations of the same benchmark with the same cluster to make the whole benchmark run finish faster. And the comparison of modified vs unmodified (daskorig) Dask: Our workstealing algorithm is very simple and it seems to have rough edges on some graphs. We would be glad if you could point us to some more interesting benchmarks. |
First I want to say this is a very exciting piece of work. Thanks for pushing on it! 😄
I think LZ4 is one of the compressors that runs into this issue. With some of your suggested serialization changes, it would be interesting to see a draft PR here to get a sense of what might change in distributed. That might help make sense of these other questions you are asking. |
Thanks for your answer. In the meantime, we ran a lot more benchmarks and improved our implementation to handle more Dask programs. We are now finishing writing an article about our experiments, so we're a bit busy, but once we finish writing it, I'll try to write about our modifications more (and send some more benchmark results here). The modifications are a series of WIP commits right now, so I don't want to create a PR until it's cleaned up a bit. But the gist of the changes can be seen here: Kobzol@f6ec885. Instead of extracting values out of messages during serialization and then putting them back during deserialization (which is a bit painful to do in Rust), we keep the original message structure and replace the previously extracted values with placeholders (which can be modelled using enumerations/abstract data types in a statically typed language). During deserialization we then load the correct values into the placeholders. |
Thanks for illustrating. Agree that makes sense. For my 2 cents, it seems like a reasonable change to add to Dask proper (following some profiling). Would be interested in reading your article whenever it is available 🙂 |
We have profiled the change and found no significant performance difference between these two frame encodings on a benchmark set that we use in our article. Nonetheless, it should be investigated more, because although it's just a few line of code, they affect pretty much every Dask packet sent by the client/server/worker. We plan to continue our work and publish it in a more documented and usable way, but right now we wanted to write an article about our efforts to gather feedback. I have sent a draft of the article to your and @mrocklin's e-mail address that is used in your git commits. |
Thank you for sending the article @Kobzol . I really enjoyed reading it. I had a few comments and questions that I wanted to bring up here. I'll focus here on broader questions save nitpicky comments for e-mail :) Reactor/Scheduler splitI liked the split between reactor (networking/events) and scheduler (task placement logic). I also think that this would be a useful split to think about in terms of performance (which side is the bottleneck?) and in terms of allowing other groups like yours to more easily experiment with different scheduling options. Decoupling these two within Dask would, I think, unlock a lot of creativity. (Also, my apologies that you had to do this in order to experiment) What is your current performance bottleneck?It looks like in best case scenario rsds is able to get about 4x faster than Dask's scheduler. This is by a combination of rewriting in Rust, and dropping most of the task placement heuristics. I was actually a little surprised that it wasn't higher. Ideally we would want to learn that there was some way to get a 50x speedup or something similar. This would give us hope for future improvement. What is your current bottleneck? What is slowing down performance? Is it that the scheduler is operating as fast as a CPU can go? This seems unlikely given the ~10000 task / second task placement that you have today with a random policy. Is it the reactor? Perhaps it's hard to manage lots of concurrent signals from different sockets? If so, is there a way around this? Is it in the network latency of having to go back and forth between the worker and scheduler every time? If so, maybe we try to estimate size outputs ahead of time so that we can speculatively send some tasks to a worker before its dependencies are finished. Is it in the client? Maybe that's taking up all of the time, and the scheduler is no longer the bottleneck. |
Regarding the reactor/scheduler split: it's a win in terms of experimenting with different schedulers and probably also testability, since the two parts can be tested in isolation. But it can also be a loss in performance, because the isolation means that you probably have to duplicate some data structures or synchronize access to them if using multiple threads, so memory usage is increased. In Rust you can run the two processes concurrently, but that's not really an option in Python. And actually even in Rust it wasn't a big win because we already use async I/O so if you run the scheduler in the same thread, I/O will still be progressing in the background and the scheduler was not a bottleneck in most cases. Regarding performance: our implementation is actually very simple, without any heavy optimizations. We didn't use TCP connection reusing (although we have it implemented), we run some scheduler stuff like recomputing the priority of tasks from scratch without caching it on every task graph change and we allocate all over the place. Although we want to improve it in the future, it was also partly intended, to see if a relatively simple algorithm + a really basic implementation is able to outperform Dask just by paying smaller runtime costs. The idea about 50x speedup is actually very interesting and it ties in to our implementation being relatively simple. I think that the Rust server is actually faster than the Dask server by an order of magnitude at least. For example, I have tried an optimization which allocates tasks linearly in memory, which is a huge performance win. In microbenchmarks, we got almost 100 % faster performance than with normal allocation. But when I tried it on the benchmarks from the paper, we saw no improvement at all, so we did not include this optimization in the rsds version used in the paper. Or a different case: we run the scheduler after every new task event, which is a bit wasteful. We implemented some batching, which helped in microbenchmarks, but again we saw almost no improvement for real task graphs. If we microbenchmarked the rsds and Dask, I have no doubt that rsds would strictly dominate even if it is not heavily optimized, but sadly that alone doesn't translate to X times faster workflows end-to-end. This observation led us to the implementation of the zero worker - if there are other overheads in the distributed runtime other than the server, then even if you make the server 100x faster, it will not help as much. There are fundamental limitations associated with the centralized server architecture, but we cannot really change that in Dask. But we can lower the overheads of the individual parts. Rsds tries to lower the overhead of the server. The client is usually not a bottleneck, at least not from our experience. The worker is I think the next thing that could be interesting to accelerate. As we saw with the zero worker, if the overhead of the worker could be reduced, the performance gap between rsds and Dask could be further improved. Rewriting part of the worker in Rust would surely help, but there is also another approach that we think could help. In our usage of Dask (both for our purposes and in the paper), we never saw much benefit from using one C-thread worker (replace C by the number of cores on your machine) and we always used C one-thread workers. This gives us more computational power overall for single-threaded Python tasks, but it has severe downsides. The data on each node is located in multiple copies, there are more network transfers and there is also more scheduler overhead. We think that some kind of "super worker" could potentially provide significnat performance gains. Here's how it could look like:
Of course this is all just an ambitious idea, we don't know whether it's possible to integrate something like this into Dask or how much (if at all) it would help. But it was our impression from our experiments that the large number of workers required to run single-threaded workloads, caused by GIL, is a significant source of overhead. This is of course not a problem for workloads which are fine with multi-threaded Python workers. I'm sorry that I do not have more specific answers to your other questions, as we didn't have time for a really deep dive. We are not CPU bound on the server and we examined our handling of sockets and async I/O and that also didn't seem to slow anything down. Network latency definitely hurts performance on some task graphs, but we don't have specific numbers. We could probably derive some based on the zero worker experiments, but they would be very specific to our topology I guess. I wanted to include UCX in the benchmarks, but I had some trouble setting it up. I think that I have managed to use it in the end (it was a few months ago), but we didn't see any large differences. It's possible that I have set it up wrong though. |
I would only expect to pay double for the amount of information that is in-flight. And even then this might not be necessary. These don't necessarily have to be in separate processes. I may not fully understand your separation between handling network events and managing the state machine of the scheduler though.
This would surprise me. I think that if our goal is rapid round-trips then yes, maybe it makes sense to accelerate the workers. But if our goal is task throughput then I don't think that the workers matter that much. This will depend a lot on workload. On something like a merge with an all-to-all shuffle then yes, this might matter because we'll need to send lots of small messages back and forth to coordinate who does what. But even then a centralized approach only costs you 2x (which may be large in your mind) over an distributed system if the scheduler is fast enough. For high throughput situations, like @rabernat 's climate workloads I don't think this will matter at all. We can always add more workers if the overhead here is high. If we are limited by lots of coordination (as in the merge workload) then I think that there are two things to do:
Building on that a little I'm actually quite curious in how much time we spend in each of the following stages:
For workloads like merge, where there is a lot of co-decision-making on the workers and schedulers, my guess is that the network communication is going to be the biggest bottleneck for small workloads, and scheduling logic the biggest bottleneck for larger workloads. Because people mostly feel a pain around larger workloads I was hoping that reducing scheduling costs would be the most helpful here.
I would love to know more about this. I'm also curious about your thoughts on how much faster the Rust scheduler would be if it implemented the same heuristics that the Python scheduler implemented. Still 10x? (I hope so)
In your experiments with your zero-worker, do you track communication time of data? I would expect this to be the main reason why a random placement scheduler would be slow.
In practice the benefit is typically around reducing communication and memory use.
In practice I've seen multi-threaded workers perform pretty well. Before diving into a more complicated structure I recommend that you investigate why things weren't performing well for you with multiple local threads. My opinion so far is that one multi-threaded process per node is the simplest solution. If there is some reason why it doesn't perform well (like some library doesn't release the GIL), then lets resolve that underlying issue.
The NVIDIA RAPIDS folks (who I imagine are now tracking this thread) may be able to help here. |
We are probably a bit biased in our experiences, because for our purposes we have been usually running single-threaded pure-Python tasks which don't benefit from multi-threaded workers. I'm sure that there are a lot of users who are fine with multi-threaded workers and that have long enough tasks that the scheduler is never a bottleneck. You are definitely right that multi-threaded workers save memory and bandwidth, it's just that for our past usecases it was not very effective computationally. Your question about how much time the individual parts take in the server is what also interests us and this is the next thing that we want to focus on. We have some very basic tracing infrastructure in rsds and we have also added it to Dask to compare some metrics (like the number of performed task steals), but some things are difficult to compare directly, because e.g. the scheduler is "spread" over more places in Dask, whereas in rsds it's basically a single function call. This warrants additional investigation, as I don't feel like saying "Rust is faster" is enough. In the paper we were usually faster, but maybe we just chose task graphs that were good for us (although we tried to not be biased here). Once I get to it I'll try to run the experiments with the tracing to provide you with some more numbers (including the comm. time with zero worker). |
Right, multi-threaded workers are especially valuable when using other PyData libraries like Numpy, Pandas, and Scikit-Learn, where Python code takes up 1% of the runtime, and tasks are usually hundreds of milliseconds long.
+1 on measurement
My guess is that a random scheduler will fail hard for some situations that care a lot about communication or memory use. Dask's scheduler has probably evolved to be fairly robust about minimizing memory use, which tends to be the most common complaint we've heard about over the years from data science workloads. |
Thanks for the update Jakub! 😄 It might be worth trying the Cythonized scheduler ( #4442 (comment) ) as well Edit: Also what does the |
Ah ok. Was wondering if it was something like that. Thanks for clarifying 🙂 Thanks for running that comparison, Jakub! 😄 That is nice to see. Yeah the flag wasn't there yet, but one could check this. Might be worth trying a more recent version (if that is an option). There were some improvements to worker selection, serialization, memory handling, etc. (though those are not really related to Cythonization of the scheduler in particular) Am curious if you would be willing to share the code you used in this experiment? No worries if not. Just thinking it might be fun to play with 🙂 |
I wanted to try a newer version, but it required Python 3.7. It's not a problem to upgrade, but the other benchmarks were executed with Python 3.6, so to keep it more fair I would probably need to upgrade all of them. I will do it eventually, I just haven't gotten to it yet :) I need to modify the benchmark suite a bit to make it easier to execute multiple benchmarks with different Python/Python library versions. The code is open source here: https://github.com/It4innovations/rsds/blob/master/scripts/benchmark.py The benchmarked task workflows can be found here. |
Ah ok. That's fair 🙂 It's possible things work with Python 3.6. There weren't too many changes when dropping it ( #4390 ). The main differences were dropping time measurement utilities for Windows, which seems irrelevant here, and some Great, thank you Jakub! 😄 It seems like a lot of these are similar to things we've been benchmarking ourselves ( Not sure if you have read any of the blogposts recently. No worries if not. Think this one on speeding up the scheduler and this one on Maybe it would be worthwhile to compare notes at some point? |
Thanks, I have read the articles, but haven't seen the video, I will go through it. HLGs are something that we have also been experimenting with (#3872), it's cool to see that Dask has progressed so much on this front.
Sure :) We can chat on the |
Ah ok. Yeah that should provide a good overview. After moving a bunch of things to HLGs, we've wound up spending time on other things related to serialization ( #4923 ), comms ( #4443 ), IO/creation ( dask/dask#6791 ), etc. Also there is more recent work on operation reordering ( dask/dask#7933 ). Probably forgetting other things here 😅 Sure. Oh gotcha 😄 Well it would be great to have another set of eyes. Would be curious to see what other issues you see when hammering on things 🙂 |
any updates @Kobzol ? |
Hi. Some of the ideas and insights from RSDS (the Rust scheduler) were used to slightly optimize Dask, but we are no longer working on the Rust scheduler. It is my understanding that since RSDS was created, Dask has moved in the direction of moving much more stuff from the client to the server (and this is a Good Thing™), however it also means that any (re)implementation of the server would now be much more complicated, because it would need to cover a much larger API surface. |
Hello,
We (I and @Kobzol) are working on a Rust implementation of dask-scheduler as an experimental drop-in replacement of dask-scheduler without any modification on worker/client side. It is an experiment for (a) evaluate performance gain of non-Python scheduler scheduler and (b) allow experimentation with different schedulers. Here I would like to report preliminary results for (a).
I am sorry for abusing Github issues, if there is a better place for contacting the community, please redirect us.
Repository: https://github.com/spirali/rsds/tree/master/src
Project status:
Benchmark
We were running the following simple code as a benchmark of a server runtime overhead.
Results
Times are obtained through "time -p python test.py".
We are aware that the benchmark is far from ideal from many aspects, we would be happy for pointing us on a code that does a better job.
The text was updated successfully, but these errors were encountered: