Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor: don't hold key references on workers #4937

Merged
merged 12 commits into from
Jul 20, 2021

Conversation

gjoseph92
Copy link
Collaborator

When constructing an Actor handle, if there is a current worker, make our Future a weakref.

I don't think this is quite the right implementation.

  1. Why does the worker= kwarg exist? It doesn't seem to be used. But it should be. Taking the if worker codepath would bypass this whole issue.
  2. What if a user is using an Actor within a task? In that case, get_worker would return a Worker, but we would want to hold a reference to the Actor key (as long as that task was running).

I think a better implementation might be to include in __reduce__ whether or not the Actor handle should be a weakref or not, basically. And in Worker.get_data, construct it such that it is a weakref.

cc @mrocklin since I believe you are to blame 😄

Fixes dask#4936

I don't think this is quite the right implementation.
1) Why does the `worker=` kwarg exist? It doesn't seem to be used. But it should be. Taking the `if worker` codepath would bypass this whole issue.
2) What if a user is using an Actor within a task? In that case, `get_worker` would return a Worker, but we _would_ want to hold a reference to the Actor key (as long as that task was running).

I think a better implementation might be to include in `__reduce__` whether or not the Actor handle should be a weakref or not, basically. And in `Worker.get_data`, construct it such that it is a weakref.
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @mrocklin since I believe you are to blame 😄

Is he ever not to blame? ;)

Comment on lines 580 to 583
start = time()
while a.actors or b.data:
await asyncio.sleep(0.1)
assert time() < start + 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a style not about the test. The implementation of this explicit timeout is usually not necessary. The gen_cluster ships with a timeout. I believe this pattern originated before the gen_cluster had its own timeout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the historical context, that's good to know. I'll take these out, though I will also make the gen_cluster timeout shorter—30s is a long time to wait for these to fail. If everything's working, the cluster shuts down instantly, so 5-10sec is plenty to know that something's wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though I will also make the gen_cluster timeout shorter—30s is a long time to wait for these to fail.

It is typically that large because on CI the runtimes vary a lot. For local testing I usually also adapt this. FWIW, there is a constant in utils_test which controls all known timeouts (gen_cluster and gen_test) so you may change that for local testing as well. If it helps we could also allow this to be set with an environment variable to make it easier for local dev.

Note, though, I believe the other tests are not protected by a timeout by default. We only have the global pytest-timeout last resort timeout

distributed/setup.cfg

Lines 53 to 54 in 7d0f010

timeout_method = thread
timeout = 300
which aborts the entire test run. these timeouts are to be avoided.

bottom line: you may keep your timeout, of course, I just wanted to fill in some info

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not advise to reduce timeout below 30s. It's very easy to set something that looks reasonably comfortable on your laptop, only to find out later that CI can get 10x slower.

self._client = default_client()
self._future = Future(key)
self._client = get_client()
self._future = Future(key, inform=self._worker is None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the inform not always necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change in this PR. See #4936 (particularly the "complex case") for a description of the case this is solving. Basically, when an Actor handle got serialized and transferred to another worker, that new worker was taking out a lease on the Actor's key, which prevented that key from ever being released because the scheduler saw that some client (that worker) wanted it, which meant the scheduler never told that worker to release the key. In the end, the Actor would continue running even when nothing depended on it.

But also as I mentioned in the description, I don't think this is a good implementation. I just wanted to post the problem and a quick fix, and mostly hear from @mrocklin on what the worker= kwarg to Actor was supposed to be used for (since I can't find it used anywhere).

I've been thinking something like

 diff --git a/distributed/actor.py b/distributed/actor.py
index 77b2cda6..79e96f5b 100644
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -49,24 +49,22 @@ class Actor(WrappedKey):
     2
     """
 
-    def __init__(self, cls, address, key, worker=None):
+    def __init__(self, cls, address, key):
         self._cls = cls
         self._address = address
         self.key = key
         self._future = None
-        if worker:
-            self._worker = worker
-            self._client = None
-        else:
-            try:
-                self._worker = get_worker()
-            except ValueError:
-                self._worker = None
+        self._client = None
+        self._worker = None
+
+        try:
+            self._worker = get_worker()
+        except ValueError:
             try:
-                self._client = default_client()
+                self._client = get_client()
                 self._future = Future(key)
             except ValueError:
-                self._client = None
+                pass
 
     def __repr__(self):
         return "<Actor: %s, key=%s>" % (self._cls.__name__, self.key)

where we remove the worker= argument, and make self._client and self._worker mutually exclusive. This way, whenever we're running on a worker, we don't create a Future.

If the Actor handle was sent to another worker because a task depends on that Actor's key, holding the Future is unnecessary—the fact that a task depends on that key means the scheduler won't release that key yet. The only issue could be running a task where the function happens to connect to an Actor internally (but the task has no explicit dependency on that Actor)—in that case, if the client released the Actor's key while that task was running, in theory the Actor could be cancelled while that task needed it, since the task doesn't itself hold a Future to the key (since get_worker() succeeded).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds a bit like the worker where the actor is deserialized is not doing a good job of cleaning up after itself.
Worker A: owner of the actor; serialize
Worker B: interested in actor result; deserialize

In particular, once the key/actor is removed on Worker B (think release_key), the future should be released. Once the future is released, the scheduler should trigger the proper forget lifecycle for the actor.

Inspecting the actor class, we do not have a dedicated release mechanism. Therefore, the future is coupled to the lifetime of the client the future is attached to. The client will probably live as long as the worker lives and therefore will only be cleaned up once the worker closes. Even worse in same-process worker situations where clients may be shared between workers, the future may even outlive the worker which cause the reference.

I argue that something like the following should do the trick

diff --git a/distributed/actor.py b/distributed/actor.py
index 231cc8b3..b40e4c1a 100644
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -64,10 +64,18 @@ class Actor(WrappedKey):
                 self._worker = None
             try:
                 self._client = get_client()
-                self._future = Future(key, inform=self._worker is None)
+                self._future = Future(key)
             except ValueError:
                 self._client = None

+    def release(self):
+        if self._future:
+            self._future.release()
+            self._future = None
+
+    def __del__(self):
+        self.release()
+
     def __repr__(self):
         return "<Actor: %s, key=%s>" % (self._cls.__name__, self.key)


diff --git a/distributed/worker.py b/distributed/worker.py
index 44e05f05..27c9de97 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2643,11 +2643,13 @@ class Worker(ServerNode):
                 self.log.append((key, "release-key", {"cause": cause}, reason))
             else:
                 self.log.append((key, "release-key", reason))
+
             if key in self.data and not ts.dependents:
-                try:
-                    del self.data[key]
-                except FileNotFoundError:
-                    logger.error("Tried to delete %s but no file found", exc_info=True)
+                data = self.data.pop(key)
+                from distributed.actor import Actor
+                if isinstance(data, Actor):
+                    data.release()
+                del data
             if key in self.actors and not ts.dependents:
                 del self.actors[key]

however, this also deadlocks and doesn't release. this deadlock seems to connect to the recent worker state machine issues and loosely connects to #4918 I'm not entirely sure if the fixes over there would resolve that deadlock, though. To get the tests unstuck I needed to add one more patch.

FWIW, I think your approach is fine. For ordinary data, workers are not able to hold references to remote data and block their release. why should it be any different for actors?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker A: owner of the actor; serialize
Worker B: interested in actor result; deserialize

In particular, once the key/actor is removed on Worker B (think release_key), the future should be released.

The issue is that there may be nothing to cause the key/actor to be removed on Worker B—release_key will never run.

Let's call the key actor-abcde. The issue is that worker B holds a future for actor-abcde. When everything that depends on actor-abcde completes, _propagate_forgotten runs on the scheduler. It sees that actor-abcde has no dependents. But it also sees that TaskState('actor-abcde').who_wants is not empty.

if not dts._dependents and not dts._who_wants:

Who wants it? actor-abcde's own client. Therefore, the scheduler doesn't recommend transitioning actor-abcde to forgotten, so the scheduler will never tell the workers to release_key('actor-abcde'), so the client preventing that key from being released will never be released.

For ordinary data, workers are not able to hold references to remote data and block their release. why should it be any different for actors?

Good point. That makes me feel comfortable going to go with the approach I showed above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test showing that an actor is not currently cleaned up?

@gjoseph92 gjoseph92 force-pushed the actor/weakref-on-workers branch from ace3775 to c3539c2 Compare June 21, 2021 18:06
@gjoseph92
Copy link
Collaborator Author

Unfortunately my proposed fix won't work because a couple other things are broken:

  1. Actor._sync doesn't work when the Actor is asynchronous and using a worker (it deadlocks). That's an easy fix:
diff --git a/distributed/actor.py b/distributed/actor.py
index 79e96f5b..d98fa93d 100644
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -107,6 +109,8 @@ class Actor(WrappedKey):
         if self._client:
             return self._client.sync(func, *args, **kwargs)
         else:
+            if self._asynchronous:
+                return func(*args, **kwargs)
             # TODO support sync operation by checking against thread ident of loop
             return sync(self._worker.loop, func, *args, **kwargs)
  1. When running an async local cluster (like we do in every gen_cluster test), get_worker just returns the first-created worker instance—not necessarily the appropriate one for which it was called. We could add the worker= kwarg back, but that won't help: when we're deserializing Actor instances (like in gather_dep), there's no way to pass in the Worker instance for it to attach to.

    The reason things currently work is because Actor._sync prefers to use a client over a worker:

    def _sync(self, func, *args, **kwargs):
    if self._client:
    return self._client.sync(func, *args, **kwargs)
    else:
    # TODO support sync operation by checking against thread ident of loop
    return sync(self._worker.loop, func, *args, **kwargs)

    So by fixing this bug where Actors created on workers have both a Client and a Worker, we expose this other, larger bug.

So options are:

  1. Refactor get_worker (and probably get_client while we're at it) to handle async local clusters correctly, likely using a contextvar for the current worker. The hard part is, where are all the places we'd have to set that contextvar?
  2. Somehow refactor deserialization to handle this case (definitely not)
  3. Ignore this problem, allow Actors to potentially use the wrong worker with async local clusters (aka most tests), but also keep redundantly creating a Client even when running on a Worker, so that behavior isn't triggered. By just using the Future(key, inform=self._worker is None) from my first attempt here 97f9d60, we manage to (somewhat un-gracefully) close Transferring Actor handle between workers makes Actor unreleasable #4936 while sidestepping the async get_worker problem.

@gjoseph92 gjoseph92 force-pushed the actor/weakref-on-workers branch from 5b63bdf to 4176294 Compare June 22, 2021 18:53
@gjoseph92
Copy link
Collaborator Author

I went with option 3: "Ignore this problem". The problem we're ignoring is written up here: #4959

I think this is also ready for final review; test failures have been flaky AFAICT.

@fjetter
Copy link
Member

fjetter commented Jun 23, 2021

Option 3 means you don't want this to be merged anymore?

@mrocklin
Copy link
Member

I'm planning on not reviewing this one, and leaving it to @fjetter and @gjoseph92 if that's ok

@gjoseph92
Copy link
Collaborator Author

Option 3 means you don't want this to be merged anymore?

Ah no I do want this to be merged. I just went with the quicker approach that solves this issue but leaves some other problems unsolved.

@jakirkham
Copy link
Member

@martindurant maybe this is of interest to you as you had done some work related to Actors recently 🙂

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me to be going the right way, but I am not certain.
To be sure, there are a lot of exceptions in the codebase to make actors happen, and comparatively little testing or, like in cases such as this one, a clear description of what the behaviour should be.

I recommend writing many more tests! Im my other worker WIP PR from ages ago, #4287, I tried to do this a bit; in that case, it's not completely clear if moving actors ought to be allowed or not.

@@ -59,12 +59,14 @@ def __init__(self, cls, address, key, worker=None):
self._client = None
else:
try:
# TODO: `get_worker` may return the wrong worker instance for async local clusters (most tests)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is running in a task (normal or actor), then the thread-local execution state must contain the right worker, no await would have happened, no? You're thinking of cases where the task itself is async?

See this line in my related unmerged PR.

self._client = default_client()
self._future = Future(key)
self._client = get_client()
self._future = Future(key, inform=self._worker is None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test showing that an actor is not currently cleaned up?

@fjetter fjetter mentioned this pull request Jul 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Transferring Actor handle between workers makes Actor unreleasable
6 participants