Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent worker Client instance in get_client #5467

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Oct 27, 2021

Fixes #5466, #3827

get_client was calling the private Worker._get_client method when it ran within a task. _get_client should really have been called _make_client, since it created a new client every time. The simplest correct thing to do instead would have been to use the Worker.client property, which caches this instance.

In order to pass the timeout parameter through though, I changed Worker._get_client to actually match its docstring and always return the same instance.

cc @crusaderky—I tested this on your reproducer from #3827 and it seems to fix it (helps to bump the threads_per_worker up to 16 or something to encourage race conditions). Since dask's get_scheduler uses get_client internally, I think this was actually the problem.

Fixes dask#4959

`get_client` was calling the private `Worker._get_client` method when it ran within a task. `_get_client` should really have been called `_make_client`, since it created a new client every time. The simplest correct thing to do instead would have been to use the `Worker.client` property, which caches this instance.

In order to pass the `timeout` parameter through though, I changed `Worker.get_client` to actually match its docstring and always return the same instance.
# must be lazy import otherwise cyclic import
from distributed.deploy.cluster import Cluster
try:
from .client import default_client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to move all imports to the top of the function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is just existing code. But I can refactor it.

assert self._client.status == "running"
Worker._initialized_clients.add(self._client)
if not asynchronous:
assert self._client.status == "running"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this? Isn't this already done by the sync Client constructor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. This is all existing code I just indented under the with self._client_lock.

gjoseph92 and others added 4 commits November 1, 2021 15:06
Co-authored-by: crusaderky <crusaderky@gmail.com>
Futures work fine, but there's all sorts of extra complexity with returning futures from futures. Don't want this to become a flaky test because of distributed reference-counting bugs with pickling futures, since that's not what this test is about.
@nickvazz
Copy link

nickvazz commented Nov 9, 2021

Would this also potentially effect the client.log_event('topic') if the topic was subscribed to outside of the worker? It seems like the worker's client ID is different from the main client's client ID, although I am not sure if that is the expected behavior.

Mainly, it throws non-blocking errors that it does not have the topic but continues to work and cant seem to catch the exception 🤔

distributed.client - ERROR - No event handler known for topic some_topic.
Traceback (most recent call last):
  File "C:\venv\lib\site-packages\distributed\client.py", line 1253, in _handle_report   
    await result
  File "C:\venv\lib\site-packages\distributed\client.py", line 3602, in _handle_event    
    self.unsubscribe_topic(topic)
  File "C:\venv\lib\site-packages\distributed\client.py", line 3653, in unsubscribe_topic
    raise ValueError(f"No event handler known for topic {topic}.")
ValueError: No event handler known for topic some_topic.

I am currently using dask/distributed 2021.10.0

code to reproduce

# reproduce_worker_issue.py
import asyncio
import time

import dask
from dask.distributed import Client, get_client, Queue, Variable, get_worker


def event_func(event):
    client = get_client()
    ts, msg = event
    print(f"event_func {ts} {msg} id:{client.id} handlers:{client._event_handlers}")


async def long_running_event_logger():
    c = get_client()
    w = get_worker()

    print(f"log running c_id:{c.id} w_id:{w.client.id}")
    while True:
        for i in range(3):
            await c.log_event('some_topic', f"{i}")
        
        await asyncio.sleep(5)


if __name__ == '__main__':
    client = Client()
    single_worker = list(client.has_what().keys())[0]

    client.subscribe_topic('some_topic', event_func)

    print(single_worker)
    client.submit(
        long_running_event_logger,
        workers=[single_worker]
    )

    for i in range(10):
        time.sleep(1)

error

PS C:> python .\reproduce_worker_issue.py
tcp://127.0.0.1:58760
log running c_id:Client-worker-83aef8c7-41b1-11ec-a6f0-534e57000000 w_id:Client-worker-83aef8c7-41b1-11ec-a6f0-534e57000000
event_func 1636499121.6069167 0 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
event_func 1636499121.608122 1 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
event_func 1636499121.6089892 2 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
event_func 1636499126.5905163 0 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
distributed.client - ERROR - No event handler known for topic some_topic.
Traceback (most recent call last):
  File "C:\venv\lib\site-packages\distributed\client.py", line 1253, in _handle_report   
    await result
  File "C:\venv\lib\site-packages\distributed\client.py", line 3602, in _handle_event    
    self.unsubscribe_topic(topic)
  File "C:\venv\lib\site-packages\distributed\client.py", line 3653, in unsubscribe_topic
    raise ValueError(f"No event handler known for topic {topic}.")
ValueError: No event handler known for topic some_topic.
distributed.client - ERROR - No event handler known for topic some_topic.
Traceback (most recent call last):
  File "C:\venv\lib\site-packages\distributed\client.py", line 1253, in _handle_report   
    await result
  File "C:\venv\lib\site-packages\distributed\client.py", line 3602, in _handle_event    
    self.unsubscribe_topic(topic)
  File "C:\venv\lib\site-packages\distributed\client.py", line 3653, in unsubscribe_topic
    raise ValueError(f"No event handler known for topic {topic}.")
ValueError: No event handler known for topic some_topic.
event_func 1636499126.5940294 1 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
event_func 1636499126.6008162 2 id:Client-82930104-41b1-11ec-8ad0-534e57000000 handlers:{'print': <function _handle_print at 0x000002A1D5197310>, 'warn': <function _handle_warn at 0x000002A1D51973A0>, 'some_topic': <function event_func at 0x000002A1D1FB7160>}
distributed.client - ERROR - No event handler known for topic some_topic.
Traceback (most recent call last):
  File "C:\venv\lib\site-packages\distributed\client.py", line 1253, in _handle_report
    await result
  File "C:\venv\lib\site-packages\distributed\client.py", line 3602, in _handle_event
    self.unsubscribe_topic(topic)
  File "C:\venv\lib\site-packages\distributed\client.py", line 3653, in unsubscribe_topic
    raise ValueError(f"No event handler known for topic {topic}.")
ValueError: No event handler known for topic some_topic.
Traceback (most recent call last):
  File ".\validation\dashboard\reproduce_worker_issue.py", line 39, in <module>
    time.sleep(1)
KeyboardInterrupt
Task was destroyed but it is pending!
task: <Task pending name='Task-167' coro=<Cluster._sync_cluster_info() done, defined at C:\venv\lib\site-packages\distributed\deploy\cluster.py:104> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002A1F6A15940>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at 
C:\venv\lib\site-packages\tornado\ioloop.py:688]>
Task was destroyed but it is pending!
task: <Task pending name='Task-168' coro=<BaseTCPConnector.connect() done, defined at C:\venv\lib\site-packages\distributed\comm\tcp.py:392> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002A1F6CD0160>()]> cb=[_release_waiter(<Future pendi...1F6A15940>()]>)() at C:\ProgramData\Anaconda3\lib\asyncio\tasks.py:429]>

pip venv:

(venv) PS C:\> pip list
Package               Version
--------------------- -----------
altair                4.1.0      
argon2-cffi           21.1.0     
astor                 0.8.1      
attrs                 21.2.0     
backcall              0.2.0      
backports.zoneinfo    0.2.1      
base58                2.1.1      
bleach                4.1.0      
blinker               1.4        
bokeh                 2.4.1      
cachetools            4.2.4      
certifi               2021.10.8  
cffi                  1.15.0
charset-normalizer    2.0.7
click                 8.0.3
cloudpickle           2.0.0
colorama              0.4.4
dask                  2021.10.0
debugpy               1.5.1
decorator             5.1.0
defusedxml            0.7.1
distributed           2021.10.0
entrypoints           0.3
fsspec                2021.11.0
gitdb                 4.0.9
GitPython             3.1.24
greenlet              1.1.2
HeapDict              1.0.1
idna                  3.3
importlib-resources   5.4.0
ipykernel             6.5.0
ipython               7.29.0
ipython-genutils      0.2.0
ipywidgets            7.6.5
jedi                  0.18.0
Jinja2                3.0.2
jsonschema            4.2.1
jupyter-client        7.0.6
jupyter-core          4.9.1
jupyterlab-pygments   0.1.2
jupyterlab-widgets    1.0.2
locket                0.2.1
MarkupSafe            2.0.1
matplotlib-inline     0.1.3
mistune               0.8.4
msgpack               1.0.2
nbclient              0.5.4
nbconvert             6.2.0
nbformat              5.1.3
nest-asyncio          1.5.1
notebook              6.4.5
npTDMS                1.4.0
numpy                 1.21.4
packaging             21.2
pandas                1.3.4
pandocfilters         1.5.0
parso                 0.8.2
partd                 1.2.0
pickleshare           0.7.5
Pillow                8.4.0
pip                   20.1.1
plotly                5.3.1
prometheus-client     0.12.0
prompt-toolkit        3.0.22
protobuf              3.19.1
psutil                5.8.0
pyarrow               6.0.0
pycparser             2.20
pydeck                0.7.1
Pygments              2.10.0
pyparsing             2.4.7
pyrsistent            0.18.0
python-dateutil       2.8.2
pytz                  2021.3
pytz-deprecation-shim 0.1.0.post0
pywin32               302
pywinpty              1.1.5
PyYAML                6.0
pyzmq                 22.3.0
requests              2.26.0
Send2Trash            1.8.0
setuptools            47.1.0
six                   1.16.0
smmap                 5.0.0
sortedcontainers      2.4.0
SQLAlchemy            1.4.26
streamlit             1.1.0
tblib                 1.7.0
tenacity              8.0.1
terminado             0.12.1
testpath              0.5.0
toml                  0.10.2
toolz                 0.11.1
tornado               6.1
tqdm                  4.62.3
traitlets             5.1.1
typing-extensions     3.10.0.2
tzdata                2021.5
tzlocal               4.1
urllib3               1.26.7
validators            0.18.2
watchdog              2.1.6
wcwidth               0.2.5
webencodings          0.5.1
wheel                 0.37.0
widgetsnbextension    3.5.2
zict                  2.0.0
zipp                  3.6.0

@crusaderky
Copy link
Collaborator

test_serialize_future is consistently failing

@gjoseph92
Copy link
Collaborator Author

Looks to me like test_serialize_future is a partially-broken test and not should have passed after #3729; this PR just unmasked the error.

In #3729 you made it so that unpickling a Client tries to use the contextvar through Client.current (good call), but explicitly disallows using the global client if no contextvar is set:

try:
c = Client.current(allow_global=False)
except ValueError:
c = get_client(address)

Instead, if no contextvar is set, we'd fall back through the convoluted logic of get_client.

The issue is that get_client tries to use get_worker first.

try:
worker = get_worker()
except ValueError: # could not find worker
pass
else:
if not address or worker.scheduler.address == address:
return worker._get_client(timeout=timeout)

get_worker's behavior is completely wrong when using an async local cluster:

try:
return thread_state.execution_state["worker"]
except AttributeError:
try:
return first(
w
for w in Worker._instances
if w.status in (Status.running, Status.paused)
)
except StopIteration:
raise ValueError("No workers found")

  1. Thread-locals (thread_state) don't work correctly with async (xref Replace worker use of threading.local() with ContextVar #5485
  2. This "look for the first instance" check is silly and assumes there's only one Worker instance per Python process—probably not true for 80% of our tests.

If we are to believe the docstring that the contract of get_worker is to "Get the worker currently running this task", then calling this function outside of a running task should raise an error. And if we are in a task, there should be no need to look at Worker._instances—we should be able to trust that thread_state.execution_state["worker"] (or the contextvar we will replace it with) has been properly set. If it's not set, it means we're not in a task, and we should raise an error.

BUT right now we don't do that. Instead, execution_state["worker"] is not set, so we just pick the first Worker instance that was created in this process.

Now before this PR, calling worker._get_client would always construct a new client. And it's only within Worker._get_client that we'd actually look for the default global client and use that:

try:
from .client import default_client
client = default_client()
except ValueError: # no clients found, need to make a new one
pass

But with this PR, _get_client uses the cached client on the Worker. And since get_worker always gives us the first worker, we always get that worker's existing client, which was constructed on the previous iteration through the loop when we used temp_default_client to set c1 as the default global client. Even though c1 is no longer the global default client, it's still that worker's cached client.

What I think is broken is that get_client should not be relying on Worker._get_client to call default_client. That's not _get_client's job and should just be an implementation detail. Instead, if get_client wants to use the default global client, it should look for it itself. And get_worker should be fixed to not return a worker when there isn't a reasonable one to return.


In general getting a correct client instance is a mess and there are too many ways to do it, too many places where it's defined, and not a clear enough definition of the hierarchy between these systems.

I propose that get_client should be the only public API to get the correct current client for whatever situation you are in.

Ideally, there'd be only one way to set a global client: a _global_client ContextVar. Instead of having Client.as_current vs default_client vs Worker._client, with careful use of Context.run and setting/resetting the variable in contextmanagers, I think we could capture the hierarchy by layering contexts/values. I'm not 100% sure this will be possible, but I think it's worth trying.

I'd much prefer that over this PR. This PR is just one small fix but leaves the overall system broken.

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 16, 2021
Progress towards dask#5485.

This was surprisingly easy and actually seems to work. Not sure yet what it breaks.

I was pleasantly surprised to find that Tornado `loop.add_callback`s, `PeriodicCallback`s, etc. all play correctly with asyncio's [built-in contextvar management](https://www.python.org/dev/peps/pep-0567/#asyncio). With that, just setting the contextvar during `__init__` and `start` probably catches almost all cases, because all the long-running callbacks/coroutines (including comms) will inherit the context that's set when they're created. Where else should we add this `as_current_worker` decorator?

This gives me confidence we'll be able to use the same pattern for a single current client contextvar as mentioned in dask#5467 (comment).
@gjoseph92 gjoseph92 requested a review from fjetter as a code owner January 23, 2024 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

get_client returns different Clients in different worker threads
3 participants