-
-
Notifications
You must be signed in to change notification settings - Fork 256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parameter Server #171
Comments
One general mechanism that might fit this need well is Pub/Sub. Under this construct any worker can join a topic as either a publisher or subscriber (or both). It learns from the scheduler who all of its peers are on that topic. When it publishes data on that topic it sends this data directly to all subscribers (or, in the future, to some subscribers that then forward it along to others). Under this model the parameter subscribes to updates and publishes parameters. An implementation of a single parameter server might look like the following: def parameter_server(...):
""" This runs on one special worker """
parameters = np.random.random(...)
updates_sub = Sub('updates') # subscribe to updates from workers
parameters_pub = Pub('parameters') # publish new parameters to workers
async def publish_parameters():
while not should_stop():
parameters_pub.put(parameters) # maybe send only to a few round-robin instead?
await gen.sleep(0.01) # maybe control this based on network traffic?
get_worker().loop.add_callback(publish_parameters)
for new_data in updates_sub:
update(parameters, new_data)
if should_stop():
return parameters
def worker(data: List[np.ndarray], # these correspond to chunks of data from a dask array
labels: List[np.ndarray]):
""" This runs on each worker and gets the arrays present on that worker """
data = np.concat(data)
labels = np.concat(labels)
updates_pub = Pub('updates')
parameters_sub = Sub('parameters')
async def update_parameters():
async for params in parameters_sub:
parameters = params
get_worker().loop.add_callback(update_parameters)
while not should_stop():
batch = data[np.random.randint(0, len(data))]
result = compute_update(parameters, batch)
updates_pub.put(result) |
To be clear Pub and Sub don't exist today, but they could exist tomorrow if this is an appropriate model. If it isn't then we can build something else. Other ideas are welcome. |
That is common practice, and is what distributed Tensorflow uses.
In the vanilla case, we only want to update In the most simple use case for distributed tensorflow, each worker passes around some parameter This pub/sub framework LGTM. Is there any way to have a worker subscribe to a topic published by another worker in particular? Something like Two extensions:
I'm inclined to believe these extensions are best suited for a class interface, at least privately. |
I encourage people to start thinking aspirationally here, a bit beyond the simplest case. I was looking at Downpour and also this paper which had different algorithms that I felt were both probably decently well approximated with PubSub. It may be though that my approximations are significant.
This would be outside of the paradigm of pub/sub but is certainly something that is doable. I expect pubsub to be something like 200-400 lines of code. We can do more things if they are important. I am not personally familiar with algorithms that are based on ring-like topologies. Is this common?
The pseudocode above does async reads/writes. The pub.put commands return immediately. The subscribe commands would block, but are happening in an async block.
I'm hoping that this is independent of communication patterns and is something that folks like you would feel comfortable writing after pubsub, or whatever network machinery we need, is put in place.
This work is likely to happen in two levels:
|
Got it. This makes more sense now.
At least one algorithm depends on a ring-structure, Hogwild++. This is suited for HPC applications, and shows much better speedups than Hogwild as the number of workers grow. I don't think I'd say this is relevant or that distributed ring-like topologies are super common, but they could be useful.
Pub/sub looks sufficient for parameter servers. Almost everything I said was ML specific. |
A draft pubsub implementation is available here: dask/distributed#1999 I suspect that it is still buggy. I'll work on setting up a mocked out example of the system above and then hopefully others can start playing with things. My initial experiments show that we have about a 1ms latency when sending messages between workers. I am curious to from people who have an understanding of what kinds of latencies we need to get decent performance in these applications. |
Moderate sized deep learning models have latency of at least 10ms for feeding 1 image through the network for prediction (source: https://dawn.cs.stanford.edu/benchmark/). The time required for optimization of these networks is (likely) no more than 5-6 times as slow, so probably at least 50-100ms for one optimization step. |
Well then I guess we're in the clear?
…On Wed, May 23, 2018 at 1:01 PM, Scott Sievert ***@***.***> wrote:
Moderate sized deep learning models have latency of at least 10ms for
feeding 1 image through the network for prediction (source:
https://dawn.cs.stanford.edu/benchmark/). The time required for
optimization of these networks is (likely) no more than 5-6 times as slow,
so probably at least 50-100ms for one optimization step.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#171 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszIZbuArkTpsK4RmGFvAomjh16NdGks5t1ZXtgaJpZM4UG6C2>
.
|
1ms latency seems reasonable – latency between cluster nodes for me is (on avg) 0.3ms. I think I'd say it a good first draft, and enough to start experimenting. I can implement a parameter server and look for any bottlenecks. |
PubSub is merged. I have a tiny toy example here: https://gist.github.com/mrocklin/0d906828544ddeb8e6e1d3d193172ae9 |
@mrocklin could I have your timing script to measure the latency? I'd like to compare on the same machine. I'm measuring the latency of PyTorch's |
… On Fri, Jun 8, 2018 at 6:15 PM, Scott Sievert ***@***.***> wrote:
@mrocklin <https://github.com/mrocklin> could I have your timing script
to measure the latency? I'd like to compare on the same machine. I'm
measuring the latency of PyTorch's distributed module to have a latency
of 0.029ms with https://gist.github.com/stsievert/
1a93cf732d66f22a3080fcd0729364d6
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#171 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszCosKvJ7JX3OxKrLwXmztvOoFe5Aks5t6vd5gaJpZM4UG6C2>
.
|
Side by side on my machine I measure PyTorch 0.4.0 to have a latency of 0.031ms and dask.distributed master to have a latency of 1.398ms, both between two processes with Python 3.6.5. I measure with the same gist, https://gist.github.com/stsievert/1a93cf732d66f22a3080fcd0729364d6 |
I'm not surprised to see PyTorch be faster, however I am surprised to see that it can do things in 30us. Some things to check if you're interested:
|
I measure a latency of 16us with Python 3.6.5 and 19us with Python 2.7.5 for the code in the updated gist. I've slightly modified the example from https://pymotw.com/3/socket/tcp.html. For PyTorch with a roundtrip, I'm measuring a latency of 37us. |
OK cool. I'll readjust my internal expectations. I'm now a bit interested in setting up a single-threaded async example with Dask to measure where our overhead is coming from. This is probably not something that I would recommend you focus on though. |
Raw Tornado has a roundtrip limit of around 200us on my machine
https://gist.github.com/609824993811575dd7d774f1eb5becc9
Under PyPy this goes down to about 60us
Again, this isn't a priority (we don't have any applications that currently
care about this) but it's fun to fool around with
…On Sun, Jun 10, 2018 at 8:22 PM, Scott Sievert ***@***.***> wrote:
I measure a latency of 16us with Python 3.6.5 and 19us with Python 2.7.5
for the code in the updated gist
<https://gist.github.com/stsievert/1a93cf732d66f22a3080fcd0729364d6>.
I've slightly modified the example from https://pymotw.com/3/socket/
tcp.html.
For PyTorch with a roundtrip connection, I'm measuring a latency of 37us.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#171 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszDBX4kgJgCbzAE8BLg_6S1kmVyqSks5t7bhWgaJpZM4UG6C2>
.
|
Clarification: all the times I measured were estimates of the latency between two machines, not the roundtrip time. This is a factor of 2, so I would have estimated 100us for the raw Tornado example, and 30us under PyPy.
Don't worry, I'm not planning to focus on this. I wanted to provide hard numbers for "1ms seems reasonable" #171 (comment). |
Another possibly cleaner way of implementing this would be by first implementing actors (see dask/distributed#2109). However that task is probably blocked on myself for the near future. Regardless though, I believe that before we spend engineering time on this problem we should have a case study problem against which we could apply it in order to guide our choices. |
I tried raising a few case study examples from a conversation with Olivier, but I wasn't able to come up with a clean case study for a problem that requires a parameter server. @stsievert perhaps this is something you could think about and develop? I believe that developing a case study should block making technical progress on this issue. |
I see a Dask parameter server as being useful in the same cases where the Tensorflow parameter server is used, because it also only works between machines. I see both of these as being useful when either
I see a Dask parameter server as playing a data management role and handing the computation to the optimization or deep learning library. It would only handle giving the data to each worker, and communicating the different updates produced by the optimization process. I think a Dask parameter server would make it easy to scale to larger datasets, and would be similar but different than various Spark learning libraries (e.g., BigDL, MLLib). I do not see it as being as useful in the same cases as Horovod. I think Horovod is more performant than Dask in inter-GPU communication, but is harder to use and less fault-tolerant across multiple machines.
I'll look for a use case where the Tensorflow parameter server is required and mirror that.
I think actors should be implemented. This would only be useful when the workers store state, which would enable fancier communication methods for synchronous optimization. For an example of the speedups of different communication methods, see https://talwalkarlab.github.io/paleo/ and select strong scaling instead of the default weak scaling. We don't need this if we want only asynchronous algorithms because the model has to be sent over the wire. We could do this with synchronous algorithms too, but it'd unnecessarily waste half the bandwidth.
👍 |
@mrocklin is implementing stateful "actors" in dask/distributed#2133, which are required for parameter servers. |
I've sketched a very rough parameter server: https://gist.github.com/stsievert/ff8a1df9300a82f15a2704e913469522.
And this naturally depends on dask/distributed#2133. |
Cool. I'm looking forward to playing with this.
What are your thoughts on a centralized parameter server approach? Not
worth doing? I'd be curious to know what one gains from a fully
decentralized system. This is probably just my general tendancy to always
start with simple things and work up though.
…On Mon, Jul 30, 2018 at 3:09 PM, Scott Sievert ***@***.***> wrote:
I've sketched a very rough parameter server: https://gist.github.com/
stsievert/ff8a1df9300a82f15a2704e913469522.
- This is a decentralized example, so every worker has state. This
allows fancy communication schemes (e.g., all-reduce).
- This integrates pretty nicely with the PyTorch's MNIST example
<https://github.com/pytorch/examples/tree/master/mnist>; only small
modifications were required to their functions.
And this naturally depends on dask/distributed#2133
<dask/distributed#2133>.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#171 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszM86-KsTpQbQXxkvvGcCbTLwjToCks5uL4QigaJpZM4UG6C2>
.
|
Good question. I've considered this. Decentralized parameter server really means that every worker can store state because it has an actor. With this, the communication between these workers can be customized while letting the actor manage the computation. Customization of the communication scheme can include:
There are real speed benefits for all-reduce communication scheme: https://talwalkarlab.github.io/paleo/ (select "strong scaling" instead of "weak scaling"). One good example of the centralized vs decentralized parameter servers is with Hogwild and Hogwild++ respectively, where Hogwild is an asynchronous optimization algorithm and Hogwild++ is a slight modification. Hogwild has moderate speedups in time to a particular accuracy, and Hogwild++ has near-linear speedups (see Figures 5 in their paper). |
I had a good time running through the notebook last night and things ran smoothly. One thing I really missed was getting feedback both about how the cluster was performing (the diagnostics are silent with actors) and on how well the model was being trained. FT |
Whoops, misfire. The diagnostics issue is something that I'll take care of for actors, probably by creating a diagnostics page on each of the workers's bokeh servers. The "how well is the model training" is a more open question though. What's the right way to do this? Periodically ask some worker to score the testing or validation set and plot the results over time? |
Good point. I should have numbers to back my decisions. I'll generate a comparison. The centralized PS does spend half it's bandwidth on communicating the model, something that's not necessary with the decentralized approach. The centralized approach has approximately the same latency cost (about the same number of connections, depending on the reduce algorithm used). However, it is more resilient because only one worker is holding state. Most existing work uses all-reduce, including Horovod and some distributed gradient coding papers for their experiments (QSGD, TernGrad). I've seen a centralized parameter server used with distributed Tensorflow. |
I'd want to see {test, train} accuracy over {time, epochs}.
Good to hear. It's also difficult to debug actors. What can be done to help with that? |
I've updated the gist with a centralized parameter server: https://gist.github.com/stsievert/ff8a1df9300a82f15a2704e913469522 I'll profile these two implementations while varying the model size. This small network for MNIST is 84KB with 21k 32-bit floats. A small ResNet is 44MB, and a deep ResNet is 169MB. I can look at dataset size too, but it only happens once at the beginning. A bigger issue is probably with scaling the number of workers. |
Do you have a cluster to try out scaling? If not, we could work through
http://dask.pydata.org/en/latest/setup/kubernetes-helm.html together
tomorrow if that would be helpful.
…On Tue, Jul 31, 2018 at 2:38 PM, Scott Sievert ***@***.***> wrote:
I've updated the gist with a centralized parameter server:
https://gist.github.com/stsievert/ff8a1df9300a82f15a2704e913469522
I'll profile these two implementations while varying the model size. This
small network for MNIST is 84KB with 21k 32-bit floats. A small ResNet is
44MB, and a deep ResNet is 169MB.
I can look at dataset size too, but it only happens once at the beginning.
A bigger issue is probably with scaling the number of workers.
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#171 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszEzvSCcvNK0RxO1K5HDtcWfzRO0bks5uMM5sgaJpZM4UG6C2>
.
|
@lesteve you might find @stsievert 's notebooks above interesting |
I've started a repo for those notebooks: https://github.com/stsievert/dask-ps, and have modified the implementation a bit. I see these timings with dask.distributed master: Currently, the parameter server is spending about 50% of it's time waiting (I think; this time is spent in This result can be reproduced by running |
@stsievert it looks like we're still spending 60% or so of our time waiting on synchronization, which seems to be the main bottleneck for scaling. Do you have any thoughts on asynchronous methods that don't wait for all updates to arrive before modifying the model? |
I've pushed what I think an async parameter server would look like here: https://gist.github.com/e57551e451bd633e13009ccce9c6ff67 Feedback on whether or not this is valid would be appreciated |
It looks like we're currently bound by serializing and communicating models around. We can balance this out by using larger batches, but that probably affects scalability of training. It would be useful to know how much increasing the batch sizes affects training performance over time. What is the right way to score a model like this? Perhaps we can have the client periodically pull down a model and score it against some testing dataset and then keep a time series of these scores over time? |
As in the following: ps = client.submit(PS, ..., actor=True)
futures = [client.submit(worker, ps, ...) for _ in range(n_workers)]
times = []
scores = []
while not all(future.done() for future in futures):
model = ps.model
times.append(time())
scores.append(score(model, test_data))
sleep(0.5)
plot(times, scores) |
I've reworked your notebook: https://github.com/stsievert/dask-ps/blob/master/Centralized-PS.ipynb. This notebook is functionally the same as my previous implementation, but it spends less time waiting. I'm not sure why. The communication scheme this notebook implements is "synchronous with backup workers", which is detailed below.
Depends on what you mean by "asynchronous" – either the workers don't wait on each other, or the optimization algorithm can return partially updated models. Figure 5 of the Tensorflow paper has a good depiction of the difference: Using the "synchronous with backup workers" method will use the users optimization method and allow the workers to operate independently, plus the timing won't be effected by stragglers. More of this difference and the performance benefits are detailed in "Revisiting distributed synchronous SGD".
I think this implements Hogwild, where workers blindly run SGD (hence the name). This does have convergence guarantees, but they're fairly weak. They require a model sparsity constraint and a limit on how long the workers can take to update the model.
Your implementation looks good. I think that'd be the easiest method. |
I've timed the parameter server in the notebook Centralized-PS.ipynb@a971d2. The parameter server behind this graph implements the "synchronous communication with backup workers", but without any backup workers (i.e, workers never wait and can repeat gradient computations). Here's a quick summary from that notebook:
The solid lines are the median, the shaded color borders specify the 95% confidence interval. I get this on my local machine (16GB RAM, no GPU) when timing the MNIST PyTorch example (which is unrealistic: gradients take a long time to compute but the model is small). |
So, looking at the profile page I see something similar-ish. Of the 11.50 CPU-seconds it took to run a 4-worker training process
There are a couple other tiny things, but not worth mentioning. There were 215 push calls recorded on the parameter server, which places each of them at around 30ms, which is longer than I'd like. During this time bandwidth on the worker with the actor reached around 50MB/s, which is nowhere near saturating local network speed. The Worker's CPU was being used during this time, mostly on networking overhead and data compression, and a little bit on deserialization. In isolation a single call to train takes around 10ms and pulling a model takes 9ms (though this is proxied through the scheduler so this is conservative) |
There are some things we can do to reduce worker overhead:
I'm curious though how expensive train calls are going to be normally. Is 10ms typical, or do these become more expensive? It may be that training a batch on MNIST is too fast for dask to be of much use. |
Turning compression off results in 380MB/s bandwidth and slightly faster performance overall. Offloading compression to a separate thread seems to get to lower bandwidth and decent performance. This might be the solution longer term. The actor worker is now not quite as hot (event loop taking up around 60% of a CPU) and actor wait time is down to around 4s total. |
A good benchmark for this is DAWNBench. Top performing deep learning models have a inference latency of 10ms, and a couple spots down is about 20ms. That means the
Sounds like we scale to a larger model, which isn't too hard. I was only using MNIST as an easy and basic example. |
Ah interesting, I hadn't drawn that connection before. So because training is just applying the model + back propagation (which is roughly the same order of magnitude) we would expect something like 2x of the inference cost. But for a minibatch would we then multiply by the batch size? Or is it generally much cheaper to compute a batch than to compute many samples individually. |
We can multiply by the batch size. The only Calculating the gradient for many examples requires knowing the how wrong the prediction is on each example, and then adding up their gradients.
Correct. Details: the gradient calculation is done via reverse mode autodiff, which typically requires no more than 5-6x more floating point operations for an typical gradient than a function evaluation. On my machine, it takes 2x longer on average to calculate the gradient vs getting the output with PyTorch's (simple) MNIST network. (also, turns out autodiff is faster than hard-coding the gradient with NumPy even though it does more work because the graph autodiff requires can be parallelized: https://stsievert.com/blog/2017/09/07/pytorch/#speed) |
Heh, not just typically. My understanding is that the 5x number is part of a theorem in the early work on the subject :) 1-2x matches my experience as well, though obviously this depends on the system you use. There is a lot of fun work in that field to suit all sorts :) |
Almost – you're off by one! It's guaranteed to be strictly less than 6:
|
Interesting, this differs from Griewank. Going back in time a bit... In 1982 Phil Wolfe [31] made the following observation regarding the ratio between the cost
The main purpose of this article is to demonstrate that Phil Wolfe's observation is in fact |
Hm... the review paper I referenced cited Griewank and Walther's 2008 textbook Evaluating Derivatives. In this, I couldn't find a claim that c < 6. I only found that c <= 4 on page 85, but I only skimmed. Without a bound on memory access time, this bound grows to c <= 5. This does rely on some (fair) assumptions about the relative cost (e.g., "that a memory access is at least as slow as a multiplication which is in turn at least as slow as an addition"). Thanks for the reference. |
I'm restarting the discussion on parameter servers from dask/dask-glm#57
When doing distributed training it is often useful to rapidly update and share parameters between distributed workers. The frequency with which these updates can be shared seems to strongly affect the rate of convergence. The common practice today seems to be to have dedicated nodes within the network that serve as parameter servers, accepting and aggregating updates from workers, and periodically publishing those back out to workers.
Dask's existing task scheduling model is not ideal for this. Every computation or communication checks in with the scheduler, which adds unnecessary network roundtrips and provides a central bottleneck for metadata tracking. To do parameter servers well we probably need to break out of this model and engage more peer-to-peer coordination, without frequent checking in with the scheduler.
Building this infrastructure is not hard, but if possible it would be good to build general machinery that can be used to solve both this problem, and potentially others. I would appreciate having a conversation with a few people to identify what programming and networking constructs might suffice for this.
cc @stsievert @fabianp @MLnick
The text was updated successfully, but these errors were encountered: