Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace loop.call_later and loop.add_callback with background tasks #6359

Closed
hendrikmakait opened this issue May 17, 2022 · 1 comment · Fixed by #6603
Closed

Replace loop.call_later and loop.add_callback with background tasks #6359

hendrikmakait opened this issue May 17, 2022 · 1 comment · Fixed by #6603
Assignees

Comments

@hendrikmakait
Copy link
Member

hendrikmakait commented May 17, 2022

At the moment, we use loop.call_later and loop.call_later in several places to fire-and-forget schedule background tasks. Since we do not keep track of them, they are never cleaned up until they are done. See the snippet for an example:

def remove_worker_from_events():
# If the worker isn't registered anymore after the delay, remove from events
if address not in self.workers and address in self.events:
del self.events[address]
cleanup_delay = parse_timedelta(
dask.config.get("distributed.scheduler.events-cleanup-delay")
)
self.loop.call_later(cleanup_delay, remove_worker_from_events)

One issue this causes is the prevention of garbage collection in tests (see #6353). To ensure that we clean up all scheduled tasks, we to implement functionality for managing background tasks and ensure proper cleanup on pending tasks. This should be implemented on the Server to make it available to all subclasses.

To advance our efforts to get rid of tornado, we want to implement this functionality using asyncio and remove the calls to loop.call_later and loop.add_callback.

Sketch of a possible implementation in Server:

class Server(...):
    ...    
    def __init__(...):
        ...
        self._background_tasks: set[asyncio.Task]
        ...

    def add_background_task(self, coro):
        task = create_task(coro())
        self._background_tasks.add(task)
        task.add_done_callback(lambda _: self._background_tasks.remove(task))
        ...

    async def close(self, ...):
        for ts in self._background_tasks:
            ts.cancel()
        await asyncio.gather(*self._background_tasks)

cc @fjetter, @graingert

@graingert
Copy link
Member

I think using return_exceptions=True is needed here, otherwise tasks are allowed to continue running after gather( returns.

There's also a problem if other tasks call await server.close() as the tasks will get cancelled multiple times.

We also need to work out what we want to do if any of the tasks fail before server.close() is called.

class Server(...):
    ...    
    def __init__(...):
        ...
        self._background_tasks: set[asyncio.Task[None]]]
        ...

    def add_background_task(self, coro: Coroutine[Any, Any, None]) -> None:
        task = asyncio.create_task(coro())
        self._background_tasks.add(task)
        task.add_done_callback(self._background_tasks.discard)
        ...

    async def close(self, ...):
        for ts in self._background_tasks:
            ts.cancel()
        results = await asyncio.gather(*self._background_tasks, return_exceptions=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment