Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recommended way to execute async functions inside dramatiq actors #528

Closed
Star3Lord opened this issue Feb 19, 2023 · 5 comments
Closed

recommended way to execute async functions inside dramatiq actors #528

Star3Lord opened this issue Feb 19, 2023 · 5 comments

Comments

@Star3Lord
Copy link

I have some library code that I want to use that only uses async functions.

Right now I am running a async to sync wrapper to execute those.

import asyncio
from typing import Coroutine

def run_async(coroutine: Coroutine) -> None:
    """
    Run a coroutine in an asyncio loop.
    """

    try:
        loop = asyncio.get_event_loop()
    except RuntimeError as e:
        if str(e).startswith('There is no current event loop in thread'):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        else:
            raise
    try:
        loop.run_until_complete(coroutine)
    except Exception as e:
        print(e)
    finally:
        # loop.close()
        pass

def run_and_return_async(coroutine: Coroutine):
    """
    Run a coroutine and return the result.
    """

    async_response = []

    async def run_and_capture_result():
        r = await coroutine
        async_response.append(r)

    try:
        loop = asyncio.get_event_loop()
    except RuntimeError as e:
        if str(e).startswith('There is no current event loop in thread'):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        else:
            raise
    try:
        loop.run_until_complete(run_and_capture_result())
        return async_response[0]
    except Exception as e:
        print(e)
    finally:
        # loop.close()
        pass

    return None

It works few times then I start getting error

Task <Task pending name='Task-4' coro=<run_and_return_async.<locals>.run_and_capture_result() running at /app/./utils/sync.py:33> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

I am using the following decorator to make async functions as dramatiq actor

from utils.sync import run_and_return_async


def async_task(async_func):
    def decorator(*args, **kwargs):
        return run_and_return_async(async_func(*args, **kwargs))

    decorator.__name__ = async_func.__name__
    decorator.__doc__ = async_func.__doc__
    decorator.__module__ = async_func.__module__
    return decorator

Any suggestions to run async functions inside actors?

@wwarne
Copy link

wwarne commented Feb 27, 2023

I suggest using https://github.com/django/asgiref
You can check their async_to_sync function to see how many edge cases they have covered already.

I use it with such decorator:

import dramatiq
from functools import wraps
from asgiref.sync import async_to_sync

def async_task(func):
    @wraps(func)
    def inner(*args, **kwargs):
        return async_to_sync(func)(*args, **kwargs)
    return inner

@dramatiq.actor(store_results=True)
@async_task
async def my_task(x, y):
    await asyncio.sleep(1)
    return x + y

I have no problems with it but my use cases are quite simple.
Maybe there are more correct ways of doing so.
I would be glad to read other suggestions.

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 28, 2024

There is a way to do this as of #536

@Bogdanp Bogdanp closed this as completed Apr 28, 2024
@z0z0r4
Copy link
Contributor

z0z0r4 commented Apr 30, 2024

It seems that maybe you can directly use async func in dramatiq.

learning form: #536 (comment)

@dramatiq.actor(store_results=True)
async def my_task(x, y):
    await asyncio.sleep(1)
    return x + y

@z0z0r4
Copy link
Contributor

z0z0r4 commented Apr 30, 2024

@Bogdanp I have a question that it seems that dramatiq runs tasks through threading? So in fact the async function will only reduce efficiency? Just a sweet?

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 30, 2024

Yes, async support is currently limited to being able to run async actors, but there are no concurrency benefits to doing so at the moment as the workers block on the async actor's execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants