Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scale edge cases #171

Merged
merged 12 commits into from
Oct 29, 2018
Merged

Fix scale edge cases #171

merged 12 commits into from
Oct 29, 2018

Conversation

guillaumeeb
Copy link
Member

Fixes #112.

Implementation of dask/distributed#2257.

First step towards #170.

Must wait for #97 to be merged and activate test test_basic_scale_edge_cases.

@guillaumeeb
Copy link
Member Author

Some question for @mrocklin here: as you mentioned in #97 #97 (comment), retire_workers() is a coroutine. Should all the _scale() function be made a coroutine and retire_workers() be called with a yield?

@mrocklin
Copy link
Member

mrocklin commented Oct 7, 2018

Should all the _scale() function be made a coroutine and retire_workers() be called with a yield?

Perhaps. There are a few ways to do this. For example it looks like in dask-kubernetes we make a small coroutine within the normal scale method and add it to the event loop asynchronously.

https://github.com/dask/dask-kubernetes/blob/a88ace69cab03aa9ce4bdb005bbb1f0be2d831d4/dask_kubernetes/core.py#L376-L385

But generally speaking it is a common pattern to make a _foo coroutine that gets called by a synchronous function foo.

@mrocklin
Copy link
Member

mrocklin commented Oct 7, 2018

Also if you have any questions about writing async code I'm happy to help. I really enjoy it now, but it does require some learning if you are not already familiar with it.

@guillaumeeb
Copy link
Member Author

Interresting to see that dask-kubernetes already overrides scale method. Will have a closer look to what it does.

@guillaumeeb
Copy link
Member Author

Okay, so the race condition identified in #112 seems fixed.

Work on building a ClusterManager has begun, but just in the scope of this PR.

Some questions left here:

  • Should _scale() be made a corountine?
  • I use a lock in _scale() so this probably means it should not be a corountine, but is the lock use correct?

Review welcomed.


This allows to do every operation with a coherent ocntext
"""
with log_errors(), self._lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use a lock in _scale() so this probably means it should not be a corountine, but is the lock use correct?

As written the lock is not necessary, as long as people call scale rather than _scale. The scale method adds this method onto the event loop:

    self.scheduler.loop.add_callback(self._scale, n)

So any thread can safely call scale, which will ask the event loop thread to call _scale the next time it is free. So the _scale method will then ever run on the single event loop thread. There is no need to protect it with a lock.

Generally speaking when using async frameworks like asyncio or tornado it is very rare to use locks. Instead we protect ourselves by putting all concurrent code on the event loop.

Should _scale() be made a corountine?

Maybe. It calls the method self.scheduler.retire_workers. This method is a coroutine that waits for a response from the workers-to-be-closed. Should _scale also wait until it gets this response? If so then we'll want to yield/await that method call:

# self.scheduler.retire_workers(workers=to_close)
yield self.scheduler.retire_workers(workers=to_close)

If you do that then you will need to make _scale a coroutine as well. You may not want this though, I'm not sure. That becomes a design decision. So I ask a question back to you:

Should _scale wait until the workers have closed themselves before calling scale_down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should _scale wait until the workers have closed themselves before calling scale_down?

Yes, I believe so for clean worker shutdown. Else there is a risk of losing in memory data. From what I understand, currently we ask the scheduler to shutdown worker cleanly, but then just kills them without waiting for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what the code does currently as I read it, yes.

So you would wait on the tornado future returned by self.scheduler.retire_workers by calling yield self.scheduler.retire_workers(...). Once you do that you need to make the _scale method a coroutine.

If you want the scale function to block then I recommend using the distributed.utils.sync function.

def scale(...):
    sync(loop, self._scale, ...)

This adds the _scale coroutine to the event loop, then waits on a threading.Event until it has finished. This must be called from a thread other than the event loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, is the code in Cluster.scale()

self.scheduler.loop.add_callback(self.scheduler.retire_workers, workers=to_close)
self.scheduler.loop.add_callback(self.scale_down, to_close)

ensuring scale_down will be called after retire_workers has finished?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It will start retire_workers and continue that until it hits a yield point. It will start scale_down after that (although other things may come in between).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want the scale function to block then I recommend using the distributed.utils.sync function

I don't think I want that.

Otherwise, updates done, so all questions answered. Ready to other reviews if Travis build passes.

@@ -35,7 +34,8 @@ def _scale(self, n):
to_close = self.scheduler.workers_to_close(
n=len(self.scheduler.workers) - n)
logger.debug("Closing workers: %s", to_close)
self.scheduler.retire_workers(workers=to_close)
# Should be an RPC call here
yield self.scheduler.retire_workers(workers=to_close)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a warning, now that you've added a yield in this coroutine it's entirely possible for another coroutine to start running while this one waits for a response. It is entirely possible that two _scale coroutines will be active at the same time.

You still can't use a threading.Lock to fix this (threading locks will lock the entire event loop). You can use a Tornado lock, or a few other methods. Short term I wouldn't worry about it though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the precision. I don't think this is an issue yet.

@guillaumeeb
Copy link
Member Author

So @mrocklin @jhamman, is it OK to merge this and go on with #170?

@mrocklin
Copy link
Member

One question: it seems like this is might be the start of many changes. Do we want to issue a release before that happens?

@guillaumeeb
Copy link
Member Author

Version 4.0 is not quite old, but some minor changes may have happened since then. I need to look in detail, but maybe a 4.1?

@mrocklin
Copy link
Member

No thoughts on the version number. 0.4.1 seems fine to me. Mostly I want to avoid the situation where someone wants some of the recent changes but we feel uncomfortable releasing because of some of the new changes. It is a small thought though and not very important.

@guillaumeeb guillaumeeb mentioned this pull request Oct 15, 2018
@guillaumeeb
Copy link
Member Author

I discovered when merging master and performing some tests that adaptive was directly calling scale_up and scale_down.

I'm not sure if this is correct, I currently have the feeling that adaptive should only rely on scale(). ClusterManager should then know if it needs to gracefully retire worker or anything else. This maybe something to keep aside for later. cc @mrocklin.

Otherwise I think this is ready to go in if Travis build succeed. This already fixes some bug I observed on scaling with or without adaptive (adaptive endless loop or with multiple scale calls).

@guillaumeeb guillaumeeb merged commit 9fe5240 into dask:master Oct 29, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants