Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aiogoogle] clean up storage client Task management #14347

Merged
merged 2 commits into from
Feb 23, 2024

Conversation

danking
Copy link
Contributor

@danking danking commented Feb 22, 2024

Alright, so.

  1. If you await, call exception, or call result on a cancelled task, you receive a CancelledError.

  2. A cancelled task is not necessarily done. The task could catch the CancelledError and do something, including raise a different exception (e.g. because a resource close failed).

  3. Nested try-finally sucks.

This PR adds _cleanup_future which:

  1. Cancels the future.

  2. Waits for the future to receive its cancellation and then terminate.

  3. Checks if the future is cleanly cancelled (in which case there is nothing more for us to do).

  4. Retrieves any present exceptions from a not-cancelled (but done!) future.

We then use this function, in combination with exit stacks, to simply and cleanly manage exceptions.

I also added some missing retry_transient_errors invocations.

Alright, so.

1. If you `await`, call `exception`, or call `result` on a cancelled task, you receive a
   `CancelledError`.

2. A cancelled task is not necessarily done. The task could catch the `CancelledError` and do
   something, including raise a different exception (e.g. because a resource close failed).

3. Nested try-finally sucks.

This PR adds `_cleanup_future` which:

1. Cancels the future.

2. Waits for the future to receive its cancellation and then terminate.

3. Checks if the future is cleanly cancelled (in which case there is nothing more for us to do).

4. Retrieves any present exceptions from a not-cancelled (but done!) future.

We then use this function, in combination with exit stacks, to simply and cleanly manage
exceptions.

I also added some missing retry_transient_errors invocations.
jigold
jigold previously requested changes Feb 23, 2024
raise exc
else:
fut.cancel()
await self._exit_stack.aclose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double check the exit stack runs in the order you intended it to run in? I know we messed that up with Batch before.

Copy link
Contributor Author

@danking danking Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is in the correct order. Above, I push onto the stack whenever I create a resource, so the resources are on the stack in their order of creation. They are then popped one by one from the stack so the most recently created is closed first, as it it would be if you had nested try-finallies.

The bug in Batch was because we did:

def close():
    async with ExitStack() as stack:
        stack.push(resource_we_wanted_to_close_first)
        stack.push(resource_we_wanted_to_close_second)

When the ExitStack runs __exit__, it pops the stack and sees "resource_we_wanted_to_close_second" first and closes that first.

The root cause of the issue in Batch was trying to use exit stacks only in the close method. They're supposed to be used the way I've used them here: you create them before any resources, push resources onto the stack as you create them, and then close them when all the resources are no longer in use.

Any time you see the creation of an ExitStack in a close or cleanup method, that's almost certainly a bug.

if self._closable:
value.close()
if self._closable and self._task.done() and not self._task.cancelled():
(await self._task).close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to close the task result in the case the task is not done and we cleanup the future below in the else clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task is not done, then we will cancel it in _cleanup_future before it gets a chance to return anything (closable or not).

The only way a task could return a closable would be to catch CancelledError, ignore it, and then return something anyway. We must not write tasks like this. That's why we use this pattern everywhere:

except CancelledError:
    raise
except Exception as exc:
    # ...

It is our job to ensure our tasks never ignore CancelledError. If you receive a CancelledError, you must terminate (by raising CancelledError) as soon as possible. The task can (quickly) cleanup resources but it definitely shouldn't return and it really shouldn't do any compute intensive work.


await self._compose(chunk_names, dest_name)

for name in chunk_names:
await pool.call(self._fs._remove_doesnt_exist_ok, f'gs://{self._bucket}/{name}')
await pool.call(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to retrieve exceptions with the gather on line 565, but not after line 572? Is the exception handled somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool.call returns a future. We await that future, which raises an exception if the future raises an exception.

On line 564, we pool.wait uses asyncio.wait to wait on chunk_tasks. asyncio.wait merely "block[s] until the [future are finished or cancelled]". It doesn't raise CancelledError if its children are cancelled. It doesn't raise exceptions if its children raise. It doesn't return values if its children return.

In contrast, asyncio.gather (when return_exceptions=False, the default) blocks until the first of these three events occur:

  1. All futures are finished.
  2. A future raises an exception.
  3. The gather itself is cancelled.

Because of line 564, we know that (1) is immediately true.

However, your comment caused me to go read the documentation and implementation of asyncio.gather more carefully. asycnio.gather will only raise the first exception, leaving the rest not retrieved. Python 3.11 has asyncio.TaskGroup which works the way you want it to. I'lll fix 565 to actually check every task one-by-one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this diff avoids using asyncio.gather in favor of actually retrieving every exception: e873d46 .

@danking danking merged commit fbd22a3 into hail-is:main Feb 23, 2024
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants