Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locally rerun successfully completed futures #4813

Merged
merged 9 commits into from
Jun 14, 2021

Conversation

ms7463
Copy link
Contributor

@ms7463 ms7463 commented May 13, 2021

Initial concept for locally rerunning a successfully completed future (rerunning failed futures functionality already exists). (Discussed in ticket #4804). Reason you'd want to do this is, for example, if the task completes successfully but is giving results you wouldn't expect. With this PR you can now do something like this:

>>> import pdb

>>> pdb.runcall(client.recreate_task_locally, future)
(Pdb) b path/to/file:<line no>
(Pdb) c
<etc.>

I believe this functionality is a superset of recreate_error_locally, but leaving that in place, in case you want to ensure the future you're calling actually did fail.

In the interest of reusing as much code as possible I reimplemented some of the existing methods with a more general version that takes a callback so the general recreate_tasks/error can pass in their relevant differences with the callback. Not married to this idea though.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A second opinion about this new feature would be appreciated but I think it makes sense. I'm also not entirely sure how backwards/forwards compatible we typically want these protocols to be, cc @jrbourbeau

regarding the implementation I would suggest a slightly different approach. With the component_getter, details_extractor and runspec_getter arguments, I feel we are overdoing it a bit in terms of flexibility and are creating a more complex plugin than necessary.

Is there a particular reason why we'd need these hooks? My impression is that they require a very thorough understanding of the extension and the scheduler and are not well suited for a high level API. Therefore, I imagine them to be actually only used by us in this implementation. If this is correct, I would propose a slightly different interface to reduce complexity. pseudo code below but all necessary pieces to put this together are already in this PR.

class ReplayTaskSchedulerExtension:
    ...

    def get_runspec(self, key) -> Dict:
        """
        Returns the runspec of a single key
        """
        ts = self.tasks[key]
        ...
        return {
            "task": ts.run_spec,
            "deps": [dts.key for dts in ts.dependencies]
        }

    def get_error_cause(self, key) -> Optional[str]:
        """
        Returns the
        """
        ts = self.tasks[key]
        return ts.exception_blame.key


class ReplayTaskClient:

    async def recreate_error_locally(self, future):
        cause = await self.scheduler.get_error_cause(future.key)
        spec_dct = await self.scheduler.get_runspec(cause.key)
        return await self._execute_spec(spec_dct)

    async def recreate_task_locally(self, future):
        spec_dct = await self.scheduler.get_runspec(future.key)
        return await self._execute_spec(spec_dct)


    async def _execute_spec(self, spec_dct):
        """Unpack and execute the response of
        `ReplayTaskSchedulerExtension.get_runspec`
        """
        # Graph to future, pack, unpack, etc
        pass

"""

def __init__(self, scheduler):
self.scheduler = scheduler
self.scheduler.handlers["cause_of_failure"] = self.cause_of_failure
self.scheduler.handlers["get_runspec"] = self.get_runspec
self.scheduler.extensions["exceptions"] = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to rename this extension here, e.g. replay-tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed. does this need to be changed elsewhere? kinda hard to tell where references to "exceptions" in the code base are relevant to this or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not. If in doubt keep as is. Once you have a green CI you can still rename in the end

@ms7463
Copy link
Contributor Author

ms7463 commented May 14, 2021

The only reason for the hooks was to maintain as much of the original structure/code as possible, while allowing for different implementations. Not sure of a proper term for it, but kinda like a functional style inheritance without classes. In any case, if we're ok with changing up the structure of the code (and API slightly), then I think your suggestions make sense. I'll take a look over the weekend.

await wait(future)
# one reason not to pass spec into this function is in case we want to
# expose a method to get all the components/deps in the future. This
# way it will be easier to do.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter - I deviated a bit from your suggestion to reference/fetch the components by spec, for the reason stated above. This comment is referring to a method similar to the current "get_futures_error" which I removed for now for the sake of removing some noise from the PR while restructuring, but can add back. Although I think the recreate_task_locally(run=False) would provide most of that functionality (with the deps resolved instead of raw)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not mind a different structure. I do care about API surface, though, and would like to only introduce features as needed.

For instance, the run argument is something I feel is not necessary. It adds some clutter to the function signature and changes the behaviour of the function significantly. Opposed to returning a tuple with well defined semantics, it instead will execute a function and will return something.
For the debugging use case it sounds like the run=False is actually what you are looking for. What about just exposing this as part of the API and tell users to call the function themselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'll remove the run arg, so that each method only has one way of processing things. I think it would be worth it to make a final decision on the set of publicly exposed methods that we would want before I add any more commits. We have all the pieces now to make it work so deciding on this set is all that should be left.

In my eyes this is the superset of what we could potentially expose (names can be shortened, potentially overly descriptive currently),

get_raw_components_from_future - returns func, and raw args/kwargs including raw unresolved future inputs/deps
get_raw_components_from_errored_future - same as above but implicitly calls get_errored_future
                                         equiavelent to current implementation of get_futures_error
get_components_from_future - similar to raw_components, but args/kwargs/deps are resolved/gathered
get_components_from_errored_future - same as above but implicitly calls get_errored_future
get_errored_future - takes a future collection object and returns the future responsible for causing error
recreate_task_locally - runs the code for a future on your local machine
recreate_error_locally - same as above but implicitly calls get_errored_future

Personally I think the only things we should expose are: recreate_task_locally and recreate_error_locally. I feel like the case where you want just the components and not execute the future code is not common for something that's supposed to be a debugging tool, especially since you can inspect the args/kwargs through pdb, when using recreate_*_locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me. Keeping the recreate_*_locally methods seems intuitive to me

Copy link
Contributor Author

@ms7463 ms7463 May 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter - made these changes. also added some analogous recreate_task_locally tests to the existing recreate_error_locally tests (which I also slightly updated to account for private api changes). All of which pass my pytest runs (scoped to just the recreate_* tests). Also added some preliminary docstrings, which could probably use some work.

@ms7463
Copy link
Contributor Author

ms7463 commented May 20, 2021

@fjetter - any thoughts on the general direction of these new changes? Think it's worth pushing this current setup forward with some tests, minor api tweaks, and docstrings?

@ms7463
Copy link
Contributor Author

ms7463 commented May 22, 2021

Commits above under the name "darle" are from me. Committed those from a different computer with the global name not set to mine. Looks like there's some ways to ammend this and replace the commit, but don't want to mess anything up, will leave as is.

@gen_cluster(client=True)
async def test_recreate_task_array(c, s, a, b):
da = pytest.importorskip("dask.array")
pytest.importorskip("scipy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this skip isn't necessary, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied this from the existing test_recreate_error_array test. Looks like it's necessary in that one (not sure why), but not this one. Removed.

Comment on lines 4747 to 4748
assert function(*args, **kwargs) == [3628800, 1814400, 1209600, 907200, 725760,
604800, 518400, 453600, 403200, 362880]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably collide with black. I recommend setting up pre-commit or run the formatting yourself before pushing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with black formatting

if not futures:
raise ValueError("No errored futures passed")
cause = await self.scheduler.get_error_cause(keys=futures)
return Future(cause)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this future instatiation necessary? Futures are a bit brittle objects and leak some abstraction. In particular they notify the scheduler about this and perform some reference counting. In particular the reference counting usually needs to be protected by a threading lock, see Client._refcount_lock. I'm wondering if this all can be implemented by just passing keys around without messing with ref counting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. updated to work with they key (or future) instead.

@ms7463
Copy link
Contributor Author

ms7463 commented Jun 2, 2021

@fjetter - the only failures from the previous CI didn't seem to be related to the changes I had made. Merged in main in case that resolves those issues. Also, any thoughts on the latest changes?

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good. I triggered CI. If nothing is popping up there, this is in. Thanks for the thorough work @ArtinSarraf !

@ms7463
Copy link
Contributor Author

ms7463 commented Jun 14, 2021

@fjetter seems like the 2 failing tests are hanging on something. Don't think it's related to these changes, or if it is, not obvious to me how so. I can intermittently recreate the error in local tests (this one: tornado.util.TimeoutError: Operation timed out after 60 seconds), but it happens on random tests, and not when I rerun that specific test independently.
Any ideas?

@fjetter
Copy link
Member

fjetter commented Jun 14, 2021

Sorry for letting this sit for so long. The seem to be unrelated. One of the failures is #4859 and for the other I opened #4914

Thanks for your contribution @ArtinSarraf !

@fjetter fjetter merged commit e690e82 into dask:main Jun 14, 2021
@ms7463
Copy link
Contributor Author

ms7463 commented Jun 14, 2021

Great, thanks for your time and help @fjetter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants