Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter server #57

Open
mrocklin opened this issue May 25, 2017 · 27 comments
Open

Parameter server #57

mrocklin opened this issue May 25, 2017 · 27 comments

Comments

@mrocklin
Copy link
Member

This is a summary of an e-mail between myself and @MLnick

From Nick

Taking the simplest version of say logistic regression, the basic idea is to split up the parameter vector (beta) itself into chunks (so it could be a dask array potentially, or some other distributed dask datastructure). Training would then iterate over "mini-batches" using SGD (let's say each mini batch is a chunk of the X array). In each mini batch, the worker will "pull" the latest version of "beta" from the Parameter Server and compute the (local) gradient for the batch. The worker then sends this gradient to the PS, which then performs the update (i.e. update its part of "beta" using say the gradient update from the worker and the step size). The next iteration then proceeds in the same way. This can be sync or async (but typically is either fully async or "bounded stale" async).

The key is to do this effectively as direct communication from the worker doing the mini batch gradient computation, to the worker holding the parameters (the "parameter server"), without involving the master ("client" app) at all, and to only "pull" and "push" the part of beta required for local computation (due to sparsity this doesn't need to be the full beta in many cases). In situations where the data is very sparse (e.g. like the Criteo data) the communication is substantially reduced in this approach. And the model size can be scaled up significantly (e.g. for FMs the model size can be very large).

This is slightly different from the way say L-BFGS works currently (and the way I seem to understand ADMM works in dask-glm) - i.e. that more or less a set of local computations are performed on the distributed data on the workers, and the results collected back to the "master", where an update step is performed (using LBFGS or the averaging of ADMM, respectively). This is also the way Spark does things.

What I'm struggling with is quite how to achieve the PS approach in dask. It seems possible to do it in a few different ways, e.g. perhaps it's possible just using simple distributed dask arrays, or perhaps using "worker_client" and/or Channels. The issue I have is how to let each worker "pull" the latest view of "beta" in each iteration, and how to have each worker "push" its local gradient out to update the "beta" view, without the "master" being involved.

I'm looking into the async work in http://matthewrocklin.com/blog/work/2017/04/19/dask-glm-2 also to see if I can do something similar here.

From me

First, there are two nodes that you might consider the "master", the scheduler and the client. This is somewhat of a deviation from Spark, where they are both in the same spot.

Second, what are your communication and computation requirements? A roundtrip from the client to scheduler to worker to scheduler to client takes around 10ms on a decent network. A worker-worker communication would be shorter, definitely, but may also involve more technology. We can do worker-to-worker direct, but I wanted to make sure that this was necessary.

Channels currently coordinate metadata through the scheduler. They work a bit like this:

  1. Worker A subscribes to channel, tells scheduler
  2. Worker B subscribes to channel, tells scheduler
  3. Worker A creates some data and registers it on the channel, tells the scheduler
  4. Scheduler tells all workers that are on this channel (A and B) that a new piece of data is on the channel
  5. Worker B says great, I want this data, and asks the scheduler where it can get it
  6. Scheduler tells Worker B that the data is on Worker A
  7. Worker B gets data from Worker A

So there are a few network hops here, although each should be in the millisecond range (I think?).

We could also set up a proper parameter server structure with single hop communicatinos. Building these things isn't hard. As usual my goal is to extract from this experiment something slightly more general to see if we can hit a broader use case.

So I guess my questions become:

  1. What are your communication requirements
  2. How much data are you likely to shove through this
  3. Are you likely to have multiple parameter servers? If so how would you anticipate sharding communication?

From Nick

The PS idea is very simple at the high level. The "parameter server" can be thought of as a "distributed key-value store". There could be 1 or more PS nodes (the idea is precisely to allow scaling the size of model parameters across multiple nodes, such as in the case of factorization machines, neural networks etc).

A good reference paper is https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf

So in theory, at the start of an iteration, a worker node asks the PS for only the parameters it needs to compute its update (in sparse data situations, this might only be a few % of the overall # features, per "partition" or "batch"). This can be thought of as a set of (key, value) pairs where the keys are vector indices and the values are vector values at the corresponding index, of the parameter vector. In practice, each PS node will hold a "slice" of the parameter vector (the paper uses a chord key layout for example), and will work with vectors rather than raw key-value pairs, for greater efficiency.

It seems like Channels might be a decent way to go about this. Yes, there is some network comm overhead but in practice for a large scale problem, the time to actually send the data (parameters and gradients say) would dominate the few ms of network hops. This cost could also be partly hidden through async operations.

The way I thought about it with Channels, which you touch on is:

  1. Let's say we have 1x PS worker for simplicity, and some other "compute" workers. The "compute" workers will hold the chunks of data (X, y blocks). The PS will hold "beta".
  2. PS creates an initial beta vector (random data, zeros, whatever). It could "publish" this vector (future?) on the Channel "params", saying "here is the latest version of beta".
  3. Workers start iteration 1, and pull the new "beta" (future?) off the channel. Let's say Worker A perhaps needs 10% of the total vector - so it "pulls" beta[idx] from PS - where idx is the set of active feature indices it needs to compute it's gradient.
  4. Worker A computes its partial gradient for the chunk. It needs to "push" this grad[idx] (or alternatively, a "sparse vector" version of grad) to the PS. It could push this as another future into the channel? Or perhaps another channel? But would the idea be that PS gets this future off the channel, knows that Worker A holds the data it needs, and does something like beta[idx] -= grad[idx] * step_size (simplified update), where it will know to pull grad[idx] from Worker A? And then "publishes" the new "beta future" on the "params" channel?
  5. This all happens async - so effectively a "slow" worker may "miss" a few beta updates. Workers could always poll the head of the channel for the latest.
  6. The PS could in this way implement some form of "bounded synchronous" updates.

To answer your specific questions:

  1. As I mention above, of course we'd prefer to have lowest cost communication for the above scenario - but I would expect a few ms overhead from network hops to be marginal in terms of overall cost. I would tend to start with what is "built in" and see if it works well, before trying to build more custom stuff.
  2. Quite a lot - that is the idea, to scale to large models. By large I would say typically 100s millions - billions of parameters in total. Each mini-batch would not typically communicate that entire parameter space, but it could still be a few million parameters per mini batch.
  3. Yes - though even 1 PS can be useful in scaling. Sharding can range from simply splitting the param vector in contiguous chunks, to "key chord" layouts and other more involved architectures (mostly this is done for fault tolerance purposes).

From me

So here is some code just to get things started off:

def parameter_server():
    beta = np.zeros(1000000)
    with worker_client() as c:
        betas = c.channel('betas', maxlen=1)
        future_beta = c.scatter(beta)
        betas.append(future_beta)

        updates = c.channel('updates')
        for update in updates:
            beta = modify(beta, update)
            future_beta = c.scatter(beta)
            betas.append(future_beta)


def worker(idx, x):
    with worker_client(separate_thread=False) as c:
        betas = c.channel('betas', maxlen=1)
        last_beta = betas.data[-1]
        subset_beta = c.submit(operator.getitem, last_beta, idx).result()
        params = subset_beta.result()

        update = create_update(x, params)
        updates = c.channel('updates')
        updates.append(update)
        updates.flush()

For what it's worth I expect this code to fail in some way. I think that channels will probably have to be slightly modified somehow. For example currently we're going to record all of the updates that have been sent to the updates channel. We need to have some way of stating that a reference is no longer needed. Channels need some mechanism to consume and destroy references to futures safely.

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

Thanks for summarizing things. I ran into a first issue: TypeError: Don't know how to scatter <class 'numpy.ndarray'>.

I assume dask array already has serializers for numpy ndarray (and sparse too?). Is there an easy way to re-use them?

@mrocklin
Copy link
Member Author

Sorry, I was on master, which supports scattering singletons. Try scattering a list:

[future]  = client.scatter([x])

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

Thanks - that solved the scatter. But with a simplified version of ps method:

def parameter_server():
    beta = np.zeros(D)
    with worker_client() as c:
        betas = c.channel('betas', maxlen=1)
        future_beta = c.scatter([beta])
        betas.append(future_beta)

I get:

...
Exception: TypeError("can't serialize <Future: status: finished, key: c85bbd0d1718128e8eb4b46d0b5940d8>",)

Full stacktrace here

@mrocklin
Copy link
Member Author

mrocklin commented May 25, 2017 via email

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

Ah thanks!

Another question - is there a way to control which workers dask arrays will be located on? i.e. Can I ensure that the dask array X is split among workers w1, w2 only?

@mrocklin
Copy link
Member Author

mrocklin commented May 25, 2017 via email

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

Thanks - it seems for constructing a dask array on a specified set of workers, it would have to be done via delayed with workers kwarg? Since array.persist(workers=[...]) seemed to throw an error: TypeError: unhashable type: 'Array'

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

I'm having a weird issue with submitted tasks on workers not being able to see updates in the channel. Here's a simple repro:

In [3]: client = Client()

In [4]: def simple_worker():
   ...:     with worker_client() as c:
   ...:         betas = c.channel('betas')
   ...:         print(betas)
   ...:

In [5]: b = client.channel('betas')

In [6]: distributed.channels - INFO - Add new client to channel, ec0e59cc-4135-11e7-8f77-a45e60e5f579, betas
distributed.channels - INFO - Add new channel betas
In [6]:

In [6]: b.append("foo")

In [7]: b.flush()

In [8]: res = client.submit(simple_worker)

In [9]: <Channel: betas - 0 elements>
distributed.channels - INFO - Add new client to channel, f7644180-4135-11e7-95ee-a45e60e5f579, betas
distributed.batched - INFO - Batched Comm Closed:

@mrocklin
Copy link
Member Author

Hrm, try x = client.persist(x, workers={x: [...]})

The worker client may not get updates immediately after creating the channel. It could take a small while. Typically we've resolved this in the past by iterating over the channel like for future in channel: ... however in this case where we want to get the last element this seems less than ideal.

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

Ok yeah - it seems to be a very small delay in being able to view the data. The "iter" works well.

This brings up a couple potentially useful things for a Channel:

  1. Be able to poll the head of the queue (or perhaps even a "slice"?) in a "blocking" manner (much like __iter__ seems to work)
  2. Be able to "clear" a Channel - during my experiments now I kept having to restart the whole lot because the channels got "full" of old updates.

@MLnick
Copy link
Contributor

MLnick commented May 25, 2017

So, I have the very basics of working setup for the PS and a worker, using an ADAGRAD variant of SGD. I haven't properly "distributed" either the gradient computation (which should be done async, parallel, per chunk of X, y data) or the params on the PS. Also haven't done the "sparse" update version, I've just been working with full dense arrays so far.

But I did verify the Adagrad version gives results close to L-BFGS (so the logic is working).

Will post further updates as I go.

@mrocklin
Copy link
Member Author

Yes, so I think that rather than extending channels we might want to build a few new constructs including:

  1. Multi-producer multi-consumer queues
  2. Global singleton values

I think that these would solve your problems, would probably be useful in other contexts as well, and would probably not be that much work. This is the sort of task that might interest @jcrist if he's interested in getting into the distributed scheduler. All logic in both cases would be fully sequentialized through the scheduler, so this shouldn't require much in the way of concurrency logic (other than ensuring that state is always valid between tornado yield points). Copying (or improving upon) the channels implementation would probably be a good start. If @jcrist is busy or not very interested in this topic I can probably get to it starting Tuesday.

Hopefully this doesn't block @MLnick from making progress on ML work. Hopefully we'll be able to progress in parallel.

thoughts?

@mrocklin
Copy link
Member Author

Checking in here. @MLnick are you still able to make progress (with what time you have).

@MLnick
Copy link
Contributor

MLnick commented May 31, 2017

Hey, meant to post an update but been a bit tied up.

I got the basics working. Here is the gist.

Current limitations / TODOs:

  1. This seems to be computing the worker chunks in serial - so this should be done async in parallel of course.
  2. Doesn't yet handle the "sparse" version, i.e. pulling only active feature indices. So this is just simple dense arrays to illustrate the PoC
  3. Only 1 PS

Having said that, it seems to work pretty well in principal and doing the "sparse pull" version should be straightforward to add.

Note the solution found by the SGD version is different from the LBFGS ones. For this small test case it is very close in terms of accuracy metrics but for larger size problems I think some things will need tweaking (e.g. item (1) above, step sizes, iterations and maybe early stopping criteria).

Expanding beyond 1 PS will be more involved since the beta data needs to be sharded across the PS nodes.

@mrocklin
Copy link
Member Author

Some feeback on the code:

  1. In worker() you might want to make the channels outside of the for loop
  2. This line blocks on each call to compute: res2 = [d.compute(workers=['w1', 'w2']) for d in res]. Instead you probably want futures = client.compute(res); client.gather(futures)
  3. Rather than use a number of iterations you might consider sending a stop signal from the client or another long-lived process using a channel with , channel.stop(). This would allow you to have an arbitrary stopping condition

@mrocklin
Copy link
Member Author

Expanding beyond 1 PS will be more involved since the beta data needs to be sharded across the PS nodes.

How is this typically done? Do the workers always send updates to both parameter servers?

@MLnick
Copy link
Contributor

MLnick commented May 31, 2017

How is this typically done? Do the workers always send updates to both parameter servers?

It really depends on architecture.

Glint for example is done with Akka actors. It has a "masterless" architecture. There is a single "actor reference" representing the (client connection to) the array on the PS. It exposes a push(indices, values) and pull(indices) method. The PS Array actor takes care of sending the correct indices and values to each shard.

One could also have a "master" PS node (or coordinator node) that handles the sharding and splitting up and re-routing requests to the relevant PS.

It could get a bit involved depending on impl details.

@MLnick
Copy link
Contributor

MLnick commented May 31, 2017 via email

@mrocklin
Copy link
Member Author

Yeah, my brain went that way as well.

First, the simple way to do this is just to have a channel for each parameter server and to manage the slicing ourselves manually. This isn't automatic but also isn't that hard.

However, the broader question you bring up is a dask collection (bag, array, dataframe, delayed) that is backed by a changing set of futures. In principle this is the same as a channel, except that rather than pushing data into a deque we would push data into a random access data structure. I might play with this a bit and see how far I get in a short time.

I hope that this doesn't block you on near-term progress though. I suspect that we can go decently far on single-parameter server systems. Do you have a sense for what your current performance bottlenecks are?

@mrocklin
Copy link
Member Author

I played with and modified your script a bit here: https://gist.github.com/MLnick/27d71e2a809a54d82381428527e4f494

This starts multiple worker-tasks concurrently. It also evaluates the change in beta over time.

@MLnick
Copy link
Contributor

MLnick commented May 31, 2017

Thanks, will take a look.

When I get some more time I will try to do the sparse param updates and then test things out on some larger (sparse) data (e.g. Criteo).

@mrocklin
Copy link
Member Author

mrocklin commented Jun 5, 2017

Whoops, it looks like my last comment copied over MLnick's implementation rather than my altered one. Regardless, here is a new one using Queues and Variables.

https://gist.github.com/mrocklin/a92785743744b5c698984e16b7065037

Things look decent. Although at the moment the parameter server isn't able to keep up with the workers. We may want to either batch many updates at once or switch to asynchronous work to overlap communication latencies.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 5, 2017

This currently depends on dask/distributed#1133

FWIW to me this approach feels much nicer than relying on channels as before. Some flaws:

As mentioned the parameter server can't keep up. I suspect that we want a get_many method on the queue to implement batching. As stated we may also want to enter into the full async API to help hide latencies. This is probably something we should do after exhausting the synchronous API though. I'd also like a good performance benchmark before going into this (which maybe someone else can help to provide?).

@MLnick if you're looking for a narrative for a blogpost, talk, etc.. then we might consider the progression of sequential computation on the parameter server, to batched, to asynchronous/batched. I'm personally curious to see the performance implications of these general choices to this problem. We have (or can easily construct) APIs for all of these fairly easily.

@mrocklin
Copy link
Member Author

@MLnick any objection to my including this as an example in a small blogpost?

@mrocklin
Copy link
Member Author

I'm happy to give you attribution for the work.

@MLnick
Copy link
Contributor

MLnick commented Jun 12, 2017 via email

@stsievert
Copy link
Member

progression of sequential computation on the parameter server, to batched, to asynchronous/batched

I'd be interested in seeing this comparison, especially as number of workers/communication channels increase.

I've looked through the PS in #57 (comment). As far as I can tell, the main benefit behind this PS is with async communication: the above distributes communication to many worker-PS channels (which means it's not limited by bandwidth of one worker-PS channel). Correct?

asynchronous/batched

Can you expand what you mean by this? I am interested in async updates that do not have locks on beta (i.e., with distributed optimization algorithms Hogwild! and Cyclades). Can the async updates you're talking about be applied to these algorithms?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants