Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous tasks #5151

Merged
merged 5 commits into from
Aug 3, 2021
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Aug 2, 2021

This allows for user-defined async def functions to be submitted as
normal tasks and be run within the event loop.

cc @cicdw @jcrist

mrocklin and others added 2 commits August 1, 2021 21:10
This allows for user-defined async def functions to be submitted as
normal tasks and be run within the event loop.
@jrbourbeau
Copy link
Member

Merged main to fix the gpuCI build

@fjetter
Copy link
Member

fjetter commented Aug 2, 2021

Is there a reason why users should use our event loop? Instead, they can use their own and we would not risk stability of our server, e.g.

    def coro_wrapper(coro, *args, **kwargs):
        loop = asyncio.get_event_loop()
        fut = coro(*args, **kwargs)
        return loop.run_until_complete(fut)

    async def my_coro(arg):
        await asyncio.sleep(1)
        return arg

    fut = client.submit(coro_wrapper, my_coro, 42)
    fut.result()

I know, we'd still be limited by the threadpool but I guess we can create an "executor" which spawns one thread and calls the above code?

@mrocklin
Copy link
Member Author

mrocklin commented Aug 2, 2021

I'm inclined to trust users until they break that trust. @cicdw and @jcrist might be able to say more from a Prefect perspective (this is the most recent place from where this request has come).

@jrbourbeau jrbourbeau mentioned this pull request Aug 2, 2021
4 tasks
@jrbourbeau
Copy link
Member

I'd also be curious to learn more about the use case here. If we end up going with running user-submitted coroutines on the worker main event loop, we should mention that in the Client.submit / Client.map docstrings (just pushed a commit to do this)

@pentschev pentschev mentioned this pull request Aug 2, 2021
@mrocklin
Copy link
Member Author

mrocklin commented Aug 2, 2021

I spoke with @jcrist and he agreed that putting this in a separate thread might be good. His concern is that more novice Prefect users might not do the right thing here. If it's ok I'd like to still start with this and see what happens. Jim also mentioned a future situation where maybe we use the executors API so that this could be switched out in the future. Jim mentioned that he was comfortable with this going in as-is and iterating as we learn more, which is also my preference.

@jrbourbeau
Copy link
Member

jrbourbeau commented Aug 2, 2021

If it's ok I'd like to still start with this and see what happens

Sure, this is separate enough from how tasks are currently run, I don't think we need to worry about impacting any existing user code. Happy to give this a try and iterate as needed.

I should note that the changes here is causing distributed/tests/test_client.py::test_unhashable_function to genuinely fail on Python 3.7. I've not tracked down the root cause yet, but it's related to our use of functools.lru_cache here

@functools.lru_cache(None)
def iscoroutinefunction(f):
return inspect.iscoroutinefunction(f) or gen.is_coroutine_function(f)

If I use a different version of Python or comment out the lru_cache, then the test passes.

we use the executors API so that this could be switched out in the future

I don't quite follow what you mean here. Could you elaborate a bit?

@mrocklin
Copy link
Member Author

mrocklin commented Aug 3, 2021

I don't quite follow what you mean here. Could you elaborate a bit?

worker.executors["async"] = SomeAsyncExecutor

Then it could be configured differently with a plugin. This isn't a big deal right now though.

@@ -2910,7 +2910,14 @@ async def execute(self, key):
try:
e = self.executors[executor]
ts.start_time = time()
if "ThreadPoolExecutor" in str(type(e)):
if iscoroutinefunction(function):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of this check revealed that since our iscoroutinefunction utility function is wrapped by functool.lru_cache for performance reasons, iscoroutinefunction raised an error when it encountered an unhashable function (which occurred in one of our tests). I pushed a commit which adds some fallback logic to make sure iscoroutinefunction can handle unhashable inputs

Comment on lines +5599 to +5600
func = _UnhashableCallable()
result = await c.submit(func, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is because dict.get became hashable starting with Python 3.8. This is why test_unhashable_function was only failing in our Python 3.7 CI builds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants