Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No longer start stopped loops in LoopRunner to allow the use of asyncio.run #6163

Open
2 of 6 tasks
Tracked by #6049
fjetter opened this issue Apr 20, 2022 · 11 comments
Open
2 of 6 tasks
Tracked by #6049
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Apr 20, 2022

The LoopRunner is accepting an event loop that has been created externally. This is a pattern that has been deprecated by CPython and our current implementation.

Requirements

  • LoopRunner no longer accepts event loops but creates and owns them itself.
  • There must still be a way to attach multiple servers (e.g. Scheduler and Client) to the same event loop.
  • The LoopRunner guarantees that all existing tornado.IOLoop instances are properly closed if the underlying asyncio event loop is stopped when loop is started and stopped without tornado API (e.g. asyncio.run).
  • It is not required to implement a deprecation cycle

More context in #6049

Implementation details

# Current interface

class LoopRunner:
    def __init__(self, loop=None, asynchronous=False):
        ...

    def start(self):
        ...
    
    def stop(self, timeout=10):
       ...

    def run_sync(self, func, *args, **kwargs):
        ...
@shughes-uk
Copy link
Contributor

shughes-uk commented Apr 21, 2022

Keeping an eye on this as it will break coiled's implementation of Cloud

@jrbourbeau
Copy link
Member

This is a pattern that has been deprecated by CPython and our current implementation.

I could be missing something, but I don't see the loop= keyword deprecated in our current LoopRunner implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their own LoopRunner instances, like dask-kubernetes, coiled, and maybe others

@fjetter
Copy link
Member Author

fjetter commented Apr 21, 2022

We are aware that this may break some users but discussed that a deprecation cycle would not be necessary

cc @mrocklin

@dchudz
Copy link
Contributor

dchudz commented Apr 21, 2022

@jrbourbeau Whatever version of coiled-runtime first includes this change should enforce a pin on the coiled package to whatever version includes the changes to accomodate this.

@mrocklin
Copy link
Member

OK, so I should clarify my thinking from before. Certainly we need downstream deployment projects (dask-kubernetes, dask-cloudprovider, etc.) to continue functioning.

What I intended to say before is that I don't care about supporting a user-workflow where users say client = Client(loop=my_loop). Instead I'm comfortable saying client = Client(asynchronous=True).

However, if our own downstream code relies on passing loop= then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.

@graingert
Copy link
Member

graingert commented May 17, 2022

This is a pattern that has been deprecated by CPython and our current implementation.

I could be missing something, but I don't see the loop= keyword deprecated in our current LoopRunner implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their own LoopRunner instances, like dask-kubernetes, coiled, and maybe others

the use of LoopRunner in dask-kubernetes is incorrect, this test fails because distributed.deploy.Cluster re-assigns the self.loop and self._loop_runner

def test_loop(k8s_cluster, release, test_namespace):
    from dask_kubernetes import HelmCluster

    with Client(nthreads=[]) as client, HelmCluster(
        release_name=release, namespace=test_namespace, loop=client.loop
    ) as cluster:
        assert cluster.loop is client.loop  # this fails because HelmCluster's parent class replaces the cluster.loop

@jakirkham
Copy link
Member

cc @jacobtomlinson (regarding loop handling in Dask-Kubernetes)

@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 18, 2022

Thanks for the heads up (and thanks for the PR in dask-kubernetes @graingert).

It is likely the asyncio stuff in downstream projects like dask-kubernetes, dask-cloudprovider, dask-jobqueue, dask-ctl, etc, etc needs some love. Especially with these upstream changes.

However, if our own downstream code relies on passing loop= then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.

In many of these projects I've copy/pasted chunks from distributed and other SpecCluster implementations, so if there are changes necessary here it is likely those will need to be propagated elsewhere. So it's not like there are intentional reasons for things to be the way they are. However, I've already seen PRs like #6205 breaking downstream projects.

@martindurant
Copy link
Member

So a quick clarifier: without loop=, asynchronous instances can only ever be constructed within a coroutine - correct? That doesn't seem to be too burdensome.

@martindurant
Copy link
Member

Do you need more feedback or anything else to move on?

@graingert
Copy link
Member

graingert commented Jun 24, 2022

Do you need more feedback or anything else to move on?

No I don't think so, I'm mostly fixing places in the codebase and tests where a loop that's about to be run is required.

rather than deprecating passing loops at all, to LoopRunner, Client, LocalCluster and SpecCluster I now plan on only deprecating passing loops that are not running, eg a loop received from the deprecated asyncio.get_event_loop()

so the plan is:

loop = IOLoop.current()  # deprecated by asyncio and tornado
c = Client(..., loop=loop)  # deprecated by distributed
cluster = SomeCluster(..., loop=loop)  # deprecated by distributed

however if you do need a global loop you would be able use:

runner = LoopRunner(asynchronous=False, loop=None)
loop = runner.start()
c = Client(..., loop=loop)
cluster = SomeCluster(..., loop=loop)
runner.stop()

but it's preferable to use:

with Client(..., loop=None) as c:
    with SomeCluster(..., loop=c.loop) as cluster:
        ...

or make your own running loop:

async def call(fn):
    return fn

with loop_in_thread() as asyncio_loop:
    io_loop = asyncio.run_coroutine_threadsafe(coro=call(IOLoop.current), loop=asyncio_loop).result()
    with Client(..., loop=io_loop) as c:
        with SomeCluster(..., loop=io_loop) as cluster:
            ...

graingert added a commit to graingert/distributed that referenced this issue Jul 5, 2022
Refs dask#6163
follow up to dask#6642
adjustments for tests added since the first round, and addition
of explicit loop=None for cases where the LoopRunner itself
is being tested
nguyenv added a commit to TileDB-Inc/TileDB-Py that referenced this issue Jul 5, 2022
* `DeprecationWarning` is being thrown by Dask but should be fixed
  by dask/distributed#6163
* Temporarily disabling this test for two weeks as to not trigger
  nightly test failures
nguyenv added a commit to TileDB-Inc/TileDB-Py that referenced this issue Jul 5, 2022
* `DeprecationWarning` is being thrown by Dask but should be fixed
  by dask/distributed#6163
* Temporarily disabling this test for two weeks as to not trigger
  nightly test failures
@fjetter fjetter changed the title No longer accept event loops for LoopRunner No longer start stopped loops in LoopRunner to allow the use of asyncio.run Aug 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants