Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client.gather not awaiting futures created with Client.run #4947

Closed
chrisk314 opened this issue Jun 22, 2021 · 6 comments
Closed

Client.gather not awaiting futures created with Client.run #4947

chrisk314 opened this issue Jun 22, 2021 · 6 comments
Labels
needs info Needs further information from the user

Comments

@chrisk314
Copy link

chrisk314 commented Jun 22, 2021

Hi, I'm struggling with running async functions concurrently using dask distributed. I'm attempted to use client.run to launch some tasks on dedicated workers in conjunction with client.gather to retrieve the results. As far as I can tell from reading the docs, my approach should be correct hence I am raising it as an issue here; however, I may be missing something, in which case the docs could potentially be improved.

For context, I'm building an application in which user defined classes represent nodes within a process graph (think manufacturing plant etc). The nodes execute bespoke code and communicate data via channels (e.g. dask.distributed.Queue). Nodes in the graph may have a large memory footprint (i.e. they contain trained machine learning models). Each node should execute all of its iterations on a single worker until it receives a termination signal. To satisfy this requirement I am using client.run and specifying a single worker, assigning workers to nodes in a round robin fashion. I realise this pattern may not be ideal and is perhaps a bit of a hack; I'm currently exploring how to implement this.

I have created a minimal example which follows the same pattern as my actual application code and reproduces the same issue.

What happened:
I create a list of futures by calling client.run in a loop and passing different arguments to a function targetted to execute on specific workers. I subsequently call client.gather to get back the results from this set of futures. Instead of waiting for the functions to execute, control continues past the client.gather call and the application exits with the below exception.

/usr/lib/python3.7/asyncio/events.py:88: RuntimeWarning: coroutine 'Client._run' was never awaited
  self._context.run(self._callback, *self._args)

If I add in a call to dask.distributed.wait(futures) before the call to Client.gather then exactly the same behaviour is observed.

What you expected to happen:
I expect that calling Client.gather will wait for all the futures to execute and return the results from the futures rather than just returning the futures themselves. Additionaly, I expect that if I call dask.distributed.wait on the list of futures, that all the futures passed in will be awaited.

Minimal Complete Verifiable Example:

import asyncio
from itertools import cycle
import time

from dask.distributed import Client, wait


SLEEP_TIME = 2.0  # Time for coroutine to sleep in seconds


async def foo(x: int, sleep_time: float = SLEEP_TIME) -> int:
    """Sleeps then returns the input value."""
    print(f"Got {x}. Sleeping for {sleep_time}s.")
    await asyncio.sleep(sleep_time)
    print(f"Done for {x}!")
    return x


def bar(x: int, sleep_time: float = SLEEP_TIME) -> int:
    """Sleeps then returns the input value (blocking version)."""
    print(f"Got {x}. Sleeping for {sleep_time}s.")
    time.sleep(sleep_time)
    print(f"Done for {x}!")
    return x


async def main() -> None:
    """Entry point for dask run."""
    # Create an async client using the local machine as a cluster.
    client = await Client(asynchronous=True)

    # Get the list of workers from the scheduler.
    workers = cycle(client.scheduler_info()["workers"])

    t_start = time.time()

    # Assign the functions to workers in round robin fashion.
    futures = [client.run(foo, i, workers=[next(workers)]) for i in range(3)]
    # futures = [client.run(bar, i, workers=[next(workers)]) for i in range(3)]

    # Await all the futures using gather.
    # wait(futures)  # Explicitely waiting for all the futures makes no difference.
    # NOTE : Futures objects are not awaited when calling `client.gather`.
    results = await client.gather(futures)
    # NOTE : Using `asyncio.gather` awaits the futures as expected.
    # results = await asyncio.gather(*futures)

    # Disply the collected results.
    print(f"results: {results}")
    print(f"Execution took {time.time() - t_start}s.")

    # Close the client connection.
    await client.close()


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Anything else we should know

  • If the call to Client.gather is replaced with asyncio.gather then the expected behaviour is observed.
  • Replacing the async function foo with the blocking function bar gives the same results.

Environment:

  • Dask version: 2021.6.0
  • Python version: 3.7.10
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): pip
@fjetter
Copy link
Member

fjetter commented Jun 22, 2021

tbh, I didn't fully understand what you are trying to achieve.

Are you deliberately using Client.run to schedule your tasks or did you intend to use Client.submit?

The notable difference is that submit will use the task scheduler and will execute the computation in a dedicated thread. this is the typical way to run things on a cluster. The Client.run method is not part of the scheduling engine and merely executes the given function inside the worker context. this is usually used for debugging or some advanced manipulation of the workers themselves.

Why this matters in this case is that Client.gather is built to await distributed Futures which is a different beast to a simple asyncio Future. For the Client.run results, the Client.gather is usually not needed since the Client.run will simply return the RPC return value. The Client.gather will fetch actual task results.

@chrisk314
Copy link
Author

chrisk314 commented Jun 23, 2021

@fjetter I will make my use case a bit more concrete. Hopefully this in addition to the high level description I gave above will make things sufficiently clear.

Firstly though, I am deliberately using Client.run as opposed to Client.submit. This is for the simple reason that it is not possible to submit async functions with Client.submit. Replacing the calls to Client.run with calls to Client.submit in my example gives the below error.

TypeError: can't pickle coroutine objects

Now to make my use case a bit more concrete. A process graph is defined as a set of Nodes which communicate data between each other via Channels (in this case Channel is backed by dask.distributed.Queue). Graphs can contain cycles. Running a dynamic simulation on the process graph involves defining a set of input data sources which will feed time series data into a set of channels one record at a time. At each iteration of a simulation each node needs to do three things: receive inputs from its input channels, compute something, send outputs on its output channels. Each node has a driver method compute that handles these steps which in pseudocode can be represented as below

class Node(object):

...

    async def compute(self) -> None:
        await self._recv_inputs()
        await self._compute()
        await self._send_outputs()

An entire simulation is run to completion by setting all nodes to run until they receive a termination signal. To handle this there is a run method which would look something like

class Node(object):

...

    async def run(self) -> None:
        while True:
            await self.compute()

Note that in the actual code there is extra logic to handle propogating a termination signal and shutting everything down. Not shown here for simplicity.

What I would like to do is to be able to schedule these nodes to run until completion on specific workers. I first tried to use Client.submit to run each of the Node.run methods for all the nodes in the simulation, and then discovered Client.run which does not have the same limitation of being unable to run coroutines.

Each of these nodes can have arbitrary state and may include for example trained machine learning models. It's therefore not desirable that they should move around between workers at different compute steps. Each node should be located on a single worker and run to completion. In terms of getting back distributed results this is not so important as this will be handled by SinkNodes which will take data over Channels and write them to a specified backend like a blob store etc.

I'm exploring Dask distributed as it provides a nice interface for either running these simulations on a single host or using a kubernetes cluster. I appreciate that my use case might not be what dask.distributed is really designed for and so it may not be the best fit.

@fjetter
Copy link
Member

fjetter commented Jun 23, 2021

I appreciate that my use case might not be what dask.distributed is really designed for and so it may not be the best fit.

I think your use case is not far off from typical usage patterns. However, scheduling coroutines is indeed something we do not directly support. At the very least the core scheduling machinery will be ignored if you are using this, e.g. you can not create dependencies between these tasks if you are using Client.run

Have you tried something like

In [1]: from distributed import Client

In [2]: client = Client()

In [3]: def wrap_async_stuff():
   ...:
   ...:     import asyncio
   ...:
   ...:     async def foo():
   ...:         print("do work")
   ...:         await asyncio.sleep(0.1)
   ...:         print("done")
   ...:         return "success"
   ...:
   ...:     loop = asyncio.get_event_loop()
   ...:     return loop.run_until_complete(foo())
   ...:

In [4]: fut = client.submit(wrap_async_stuff)

In [5]: do work
done
In [5]:

In [5]: fut.result()
Out[5]: 'success'

Nodes which communicate data between each other via Channels (in this case Channel is backed by dask.distributed.Queue)

FYI we also implement a publish-subscribe pattern but I believe our docs in that area are missing
if you don't mind browsing code, have a look at https://github.com/dask/distributed/blob/main/distributed/pubsub.py
If you have this covered with queue, that's also fine.

@chrisk314
Copy link
Author

chrisk314 commented Jun 24, 2021

@fjetter thanks for that suggestion with your example above. That approach hadn't occurred to me. I will try it out today. I'd like to try it with the k8s deployment of dask but I'm finding that when I deploy the default helm chart on my machine the workers cannot discover the scheduler for some reason. Will open a separate issue on that front if I can't figure out the problem.

In terms of the pubsub stuff I did notice that somewhere. At the moment Queue fits nicely for us behind the abstract interface we've defined with our Channel class and seems in principle that it should do the job - if I can get all this stuff working nicely.

@fjetter
Copy link
Member

fjetter commented Aug 4, 2021

@chrisk314 we recently merged a change to support async tasks directly, see #5151

Is there anything else left to do in this issue?

@jrbourbeau jrbourbeau added the needs info Needs further information from the user label Aug 4, 2021
@jrbourbeau
Copy link
Member

This should be closed via #5151. @chrisk314 feel free to re-open if that's not the case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

3 participants