Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locally rerun successfully completed futures #4813

Merged
merged 9 commits into from
Jun 14, 2021
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,9 @@ def __init__(
self.start(timeout=timeout)
Client._instances.add(self)

from distributed.recreate_exceptions import ReplayExceptionClient
from distributed.recreate_tasks import ReplayTaskClient

ReplayExceptionClient(self)
ReplayTaskClient(self)

@contextmanager
def as_current(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
logger = logging.getLogger(__name__)


class ReplayExceptionScheduler:
"""A plugin for the scheduler to recreate exceptions locally
class ReplayTaskScheduler:
"""A plugin for the scheduler to recreate tasks locally

This adds the following routes to the scheduler

* cause_of_failure
* get_runspec
"""

def __init__(self, scheduler):
self.scheduler = scheduler
self.scheduler.handlers["cause_of_failure"] = self.cause_of_failure
self.scheduler.handlers["get_runspec"] = self.get_runspec
self.scheduler.extensions["exceptions"] = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to rename this extension here, e.g. replay-tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed. does this need to be changed elsewhere? kinda hard to tell where references to "exceptions" in the code base are relevant to this or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not. If in doubt keep as is. Once you have a green CI you can still rename in the end


def cause_of_failure(self, *args, keys=(), **kwargs):
Expand All @@ -38,11 +40,7 @@ def cause_of_failure(self, *args, keys=(), **kwargs):
task: the definition of that key
deps: keys that the task depends on
"""
for key in keys:
if isinstance(key, list):
key = tuple(key) # ensure not a list from msgpack
key = stringify(key)
ts = self.scheduler.tasks.get(key)
def error_details_extractor(ts):
if ts is not None and ts.exception_blame is not None:
cause = ts.exception_blame
# NOTE: cannot serialize sets
Expand All @@ -51,17 +49,41 @@ def cause_of_failure(self, *args, keys=(), **kwargs):
"cause": cause.key,
"task": cause.run_spec,
}
return self.get_runspec(keys=keys, details_extractor=error_details_extractor, *args, **kwargs)


def get_runspec(self, *args, keys=(), details_extractor=None, **kwargs):
def default_details_extractor(ts):
return {
"task": ts.run_spec,
"deps": [dts.key for dts in ts.dependencies]
}

if details_extractor is None:
details_extractor = default_details_extractor


for key in keys:
if isinstance(key, list):
key = tuple(key) # ensure not a list from msgpack
key = stringify(key)
ts = self.scheduler.tasks.get(key)
details = details_extractor(ts)
if details is not None:
return details


class ReplayExceptionClient:
class ReplayTaskClient:
"""
A plugin for the client allowing replay of remote exceptions locally
A plugin for the client allowing replay of remote tasks locally

Adds the following methods (and their async variants)to the given client:

- ``recreate_error_locally``: main user method
- ``recreate_error_locally``: main user method for replaying failed tasks
- ``recreate_task_locally``: main user method for replaying any task
- ``get_futures_error``: gets the task, its details and dependencies,
responsible for failure of the given future.
- ``get_futures_components``: gets the task, its details and dependencies.
"""

def __init__(self, client):
Expand All @@ -73,16 +95,23 @@ def __init__(self, client):
self.client._get_futures_error = self._get_futures_error
self.client.get_futures_error = self.get_futures_error

self.client.recreate_task_locally = self.recreate_task_locally
self.client._recreate_task_locally = self._recreate_task_locally
self.client._get_futures_components = self._get_futures_components
self.client.get_futures_components = self.get_futures_components

@property
def scheduler(self):
return self.client.scheduler

async def _get_futures_error(self, future):
# only get errors for futures that errored.
futures = [f for f in futures_of(future) if f.status == "error"]
if not futures:
raise ValueError("No errored futures passed")
out = await self.scheduler.cause_of_failure(keys=[f.key for f in futures])
async def _get_futures_components(self, futures, runspec_getter=None):
if runspec_getter is None:
runspec_getter = self.scheduler.get_runspec

if not isinstance(futures, list): # TODO: other iterables types?
futures = [futures]

out = await runspec_getter(keys=[f.key for f in futures])
deps, task = out["deps"], out["task"]
if isinstance(task, dict):
function, args, kwargs = _deserialize(**task)
Expand All @@ -91,6 +120,17 @@ async def _get_futures_error(self, future):
function, args, kwargs = _deserialize(task=task)
return (function, args, kwargs, deps)

async def _get_futures_error(self, future):
# only get errors for futures that errored.
futures = [f for f in futures_of(future) if f.status == "error"]
if not futures:
raise ValueError("No errored futures passed")

return await self._get_futures_components(futures, self.scheduler.cause_of_failure)

def get_futures_components(self, future):
return self.client.sync(self._get_futures_components, future)

def get_futures_error(self, future):
"""
Ask the scheduler details of the sub-task of the given failed future
Expand All @@ -116,20 +156,36 @@ def get_futures_error(self, future):

See Also
--------
ReplayExceptionClient.recreate_error_locally
ReplayTaskClient.recreate_error_locally
"""
return self.client.sync(self._get_futures_error, future)

async def _recreate_error_locally(self, future):

async def _recreate_task_locally(self, future, component_getter=None):
if component_getter is None:
component_getter = self._get_futures_components

await wait(future)
out = await self._get_futures_error(future)
out = await component_getter(future)
function, args, kwargs, deps = out
futures = self.client._graph_to_futures({}, deps)
data = await self.client._gather(futures)
args = pack_data(args, data)
kwargs = pack_data(kwargs, data)
return (function, args, kwargs)

async def _recreate_error_locally(self, future):
return await self._recreate_task_locally(
future,
component_getter=self._get_futures_error
)

def recreate_task_locally(self, future):
func, args, kwargs = sync(
self.client.loop, self._recreate_task_locally, future
)
func(*args, **kwargs)

def recreate_error_locally(self, future):
"""
For a failed calculation, perform the blamed task locally for debugging.
Expand Down
4 changes: 2 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from .publish import PublishExtension
from .pubsub import PubSubSchedulerExtension
from .queues import QueueExtension
from .recreate_exceptions import ReplayExceptionScheduler
from .recreate_tasks import ReplayTaskScheduler
from .security import Security
from .semaphore import SemaphoreExtension
from .stealing import WorkStealing
Expand Down Expand Up @@ -174,7 +174,7 @@ def nogil(func):
LockExtension,
MultiLockExtension,
PublishExtension,
ReplayExceptionScheduler,
ReplayTaskScheduler,
QueueExtension,
VariableExtension,
PubSubSchedulerExtension,
Expand Down