-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker_key parameter to Adaptive #1992
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from __future__ import print_function, division, absolute_import | ||
|
||
import atexit | ||
from datetime import timedelta | ||
import logging | ||
import math | ||
from time import sleep | ||
|
@@ -12,7 +13,7 @@ | |
from .cluster import Cluster | ||
from ..core import CommClosedError | ||
from ..utils import (sync, ignoring, All, silence_logging, LoopRunner, | ||
log_errors) | ||
log_errors, thread_state) | ||
from ..nanny import Nanny | ||
from ..scheduler import Scheduler | ||
from ..worker import Worker, _ncores | ||
|
@@ -145,12 +146,24 @@ def __repr__(self): | |
def __await__(self): | ||
return self._started.__await__() | ||
|
||
def sync(self, func, *args, **kwargs): | ||
asynchronous = kwargs.pop('asynchronous', None) | ||
if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False): | ||
callback_timeout = kwargs.pop('callback_timeout', None) | ||
future = func(*args, **kwargs) | ||
if callback_timeout is not None: | ||
future = gen.with_timeout(timedelta(seconds=callback_timeout), | ||
future) | ||
return future | ||
else: | ||
return sync(self.loop, func, *args, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this related to the grouped worker issue? (I'm not following) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something was slightly nicer if we switched to using self.sync here (this is a common pattern in the client). I decided to clean this up as part of this PR. |
||
|
||
def start(self, **kwargs): | ||
self._loop_runner.start() | ||
if self._asynchronous: | ||
self._started = self._start(**kwargs) | ||
else: | ||
sync(self.loop, self._start, **kwargs) | ||
self.sync(self._start, **kwargs) | ||
|
||
@gen.coroutine | ||
def _start(self, ip=None, n_workers=0): | ||
|
@@ -219,7 +232,7 @@ def start_worker(self, **kwargs): | |
------- | ||
The created Worker or Nanny object. Can be discarded. | ||
""" | ||
return sync(self.loop, self._start_worker, **kwargs) | ||
return self.sync(self._start_worker, **kwargs) | ||
|
||
@gen.coroutine | ||
def _stop_worker(self, w): | ||
|
@@ -236,7 +249,7 @@ def stop_worker(self, w): | |
>>> w = c.start_worker(ncores=2) # doctest: +SKIP | ||
>>> c.stop_worker(w) # doctest: +SKIP | ||
""" | ||
sync(self.loop, self._stop_worker, w) | ||
self.sync(self._stop_worker, w) | ||
|
||
@gen.coroutine | ||
def _close(self): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the logic for passing in minimum here? Should you also pass in maximum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minimum here corresponds to the minimum number of workers that we want to keep. The
workers_to_close
function is a slightly nicer place to handle this than outside. For example ifworkers_to_close
tells us that we need to close all of our workers, we then look at minimum and see that we need one, which worker should we keep? Also, which other workers should we keep that are in the same group. Suddenly we have to replicate the grouping logic.Maximum doesn't make sense because we're only reducing the number of workers, not increasing them.