Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close comm on CancelledError #5656

Merged
merged 5 commits into from
Jan 13, 2022
Merged

Close comm on CancelledError #5656

merged 5 commits into from
Jan 13, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jan 13, 2022

Fix crash in tornado that would happen when a task waiting on read() got cancelled (typically by a timeout). The comm would go back into the pool of available connections, without notifying the server and later causing two concurrent reads on client side.

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x16ac14400>>, <Task finished name='Task-25633' coro=<Client._update_scheduler_info() done, defined at /Users/fjetter/workspace/distributed/distributed/client.py:1111> exception=AssertionError('Already reading')>)
Traceback (most recent call last):
  File "/Users/fjetter/mambaforge/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/Users/fjetter/mambaforge/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/Users/fjetter/workspace/distributed/distributed/client.py", line 1115, in _update_scheduler_info
    self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
  File "/Users/fjetter/workspace/distributed/distributed/core.py", line 886, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/fjetter/workspace/distributed/distributed/core.py", line 663, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/fjetter/workspace/distributed/distributed/comm/tcp.py", line 205, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
  File "/Users/fjetter/mambaforge/envs/dask-distributed/lib/python3.8/site-packages/tornado/iostream.py", line 421, in read_bytes
    future = self._start_read()
  File "/Users/fjetter/mambaforge/envs/dask-distributed/lib/python3.8/site-packages/tornado/iostream.py", line 809, in _start_read
    assert self._read_future is None, "Already reading"
AssertionError: Already reading

@@ -519,7 +519,7 @@ async def handle_comm(self, comm):
result = asyncio.ensure_future(result)
self._ongoing_coroutines.add(result)
result = await result
except (CommClosedError, CancelledError):
except (CommClosedError, CancelledError, asyncio.CancelledError):
Copy link
Collaborator Author

@crusaderky crusaderky Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. What determines which exception is raised (CancelledError is concurrent.futures.CancelledError, which is tornado's)? The event loop being used, or the choice of @gen_coroutine vs. async def?

Anyway I didn't find any failure here in real life - this is just defensive programming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually never expect a concurrent.futures.CancelledError here, but would expect a asyncio.CancelledError to be possible. If a task is ever cancelled while waiting on handle_comm, a asyncio.CancelledError should be raised. If we're seeing both types of cancelled errors here, we should probably figure out how to convert to only one (I'd prefere the asyncio version, which should be what's hit most places in the code base).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to change it to

                    except (CommClosedError, asyncio.CancelledError):
                        ...
                    except CancelledError as e:  # pragma: nocover
                        raise AssertionError("Unexpected exception") from e

and see if the path is ever hit? Or should I just clean it away?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good to remove the catch for the concurrent futures version, if for no other reason than readability. I'd probably delete it and see if any tests fail, but up to you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

force_close = True
raise
except (CancelledError, asyncio.CancelledError):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I don't have evidence that concurrent.futures.CancelledError is ever raised.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after reading the tornado code, there's no part of our async code that should ever hit concurrent.futures.CancelledError. Maybe this would happen for older versions of tornado, but not any version we're currently using.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed concurrent.futures.CancelledError

comm.abort()
elif please_close:
await comm.close()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of OSError, this should halve the latency of scheduler.broadcast (which is the only method that uses close=True).

@crusaderky crusaderky requested a review from jcrist January 13, 2022 13:22
@crusaderky crusaderky self-assigned this Jan 13, 2022
@crusaderky crusaderky marked this pull request as ready for review January 13, 2022 13:23
@crusaderky
Copy link
Collaborator Author

@jcrist could you take a look?
FYI @fjetter

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if our application of reuse is safe

try:
result = await send_recv(comm=comm, op=key, **kwargs)
finally:
self.pool.reuse(self.addr, comm)
comm.name = name

  1. Everything works great -> reuse -> no need for finally
  2. Comm broken -> reuse will not reuse but remove the comm from the pool
  3. CancelledError -> reusing the comm resulting in a broken stream

there are one or two more places where we're using the ConnectionPool.reuse

@@ -531,6 +532,27 @@ async def test_send_recv_args():
server.stop()


@gen_test(timeout=5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe for CI?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it takes 35ms on my machine, so I think so. I set it very low to detect a regression where everything gets stuck for 10-20 seconds or whatever the default timeouts are.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jan 13, 2022

I'm wondering if our application of reuse is safe

try:
result = await send_recv(comm=comm, op=key, **kwargs)
finally:
self.pool.reuse(self.addr, comm)
comm.name = name

  1. Everything works great -> reuse -> no need for finally
  2. Comm broken -> reuse will not reuse but remove the comm from the pool
  3. CancelledError -> reusing the comm resulting in a broken stream

there are one or two more places where we're using the ConnectionPool.reuse

I think it's safe. See the implementation of reuse:

if comm.closed():
self.semaphore.release()
else:
self.available[addr].add(comm)
if self.semaphore.locked() and self._n_connecting > 0:
self.collect()

In master, on CancelledError it's hitting lines 1089:1091. With this PR it hits line 1087 instead.

I added a comment in reuse() to clarify what's happening

crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this seems good to me. Nice catch.

crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
@crusaderky
Copy link
Collaborator Author

All test failures seem to be unrelated

@crusaderky crusaderky merged commit bf4ecff into dask:main Jan 13, 2022
@crusaderky crusaderky deleted the comm_cancel branch January 13, 2022 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants