Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous (asyncio) contexts in actors #536

Closed
wants to merge 2 commits into from

Conversation

caspervdw
Copy link
Contributor

Hi @Bogdanp

Thanks for the great work on this package!

I ported some code from another project to support asyncio with dramatiq. Reading #238 I got the feeling there might be people wanting to use this. We ourselves have an async webserver, of which we wanted to use parts from a worker process, and asgiref.sync_to_async didn't play nice with sqlalchemy, so we made this.

The bottomline here is to have a single event loop thread per worker process. This EventLoopThread is spinned up from the AsyncMiddleware. If AsyncMiddleware is inplace, you can decorate coroutines with @async_actor. Worker threads then schedule on the event loop and wait for their tasks to finish. In this way, I didn't have to touch the dramatiq core code.

There are some points to discuss here, but first I'd like to know if @Bogdanp is 👍 on proceeding with this PR?

@Bogdanp
Copy link
Owner

Bogdanp commented Mar 10, 2023

I've only had a chance to skim this so far, but it looks great! I'll do a more in-depth review when I have more time in a couple of weeks. Thanks!

@caspervdw
Copy link
Contributor Author

OK great that you are positive on the approach. Two points I'd like to discuss:

  • It would be leaner if we got rid of the async_actor and put an inspect.iscoroutinefunction inside the base Actor class. That would recognize async def functions, check if the asyncmiddleware is in place, and wrap the coroutine.
  • I am not happy with the broker.run_coroutine monkeypatch. But I am not sure what to do then; we need something that is accessible from other middleware (say you want to initialize your database connection pool on the event loop), and from the actor. Maybe you have ideas how to fit this into the architecture?

@Bogdanp
Copy link
Owner

Bogdanp commented Mar 29, 2023

It would be leaner if we got rid of the async_actor and put an inspect.iscoroutinefunction inside the base Actor class. That would recognize async def functions, check if the asyncmiddleware is in place, and wrap the coroutine.

I think this would be fine.

I am not happy with the broker.run_coroutine monkeypatch. But I am not sure what to do then; we need something that is accessible from other middleware (say you want to initialize your database connection pool on the event loop), and from the actor. Maybe you have ideas how to fit this into the architecture?

Would it work to just have a shared event_loop_thread in a module + {get,set}_event_loop_thread functions like we have with brokers? Why is the monkeypatching necessary?

@caspervdw
Copy link
Contributor Author

@Bogdanp I refactored according to your feedback.

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 2, 2023

Thanks! I've squashed your changes and made some of my own on top.

@Bogdanp Bogdanp closed this Apr 2, 2023
@caspervdw
Copy link
Contributor Author

Thanks @Bogdanp for finishing, the result looks great.

@dream2333
Copy link

Is it possible to provide a way to change the default event loop? Using uvloop can further improve asynchronous efficiency, here are some of my benchmarks:

Python 3.9 Python 3.11
gevent 34281.80 requests / second 32258.07 requests / second
asyncio 40144.52 requests / second 51652.89 requests / second
asyncio + uvloop 64102.57 requests / second 66577.90 requests / second

@Bogdanp
Copy link
Owner

Bogdanp commented Apr 3, 2023

@dream2333 yes, the event loop itself can probably be parameterized. Alternatively, it seems like it would be relatively easy for folks to extend EventLoopThread and the AsyncIO middleware in their own project.

@caspervdw
Copy link
Contributor Author

@dream2333 Did you do your benchmarks with dramatiq? As dramatiq needs one WorkerThread per concurrently running actor, the extreme concurrency that asyncio may provide in other situations is not really an option here.

@MrFox131
Copy link

Hi! Are there any estimations when this will come to new release?

@denjas
Copy link

denjas commented Oct 30, 2023

Could you show a working example?
I have 2 files:

# tasks.py
import requests
import dramatiq

@dramatiq.actor
async def count_words(url):
    response = requests.get(url)
    count = len(response.text.split(" "))
    print(f"There are {count} words at {url!r}.")
# main.py
import asyncio
from tasksd import count_words

async def start():
    count_words.send("http://example.com")

if __name__ == '__main__':
    asyncio.run(start())

I launch the worker

dramatiq --processes 1 tasks

I'm adding a task

 python  main.py

I get an error in the worker:

RuntimeError: Global event loop thread not set. Have you added the AsyncIO middleware to your middleware stack?

Please tell me what am I doing wrong? Or provide a link to a working example?

UPD: I figured it out. Maybe someone will speed up their understanding.

# tasks.py
import httpx
from dramatiq.actor import actor
import dramatiq
from dramatiq.middleware.asyncio import AsyncIO
from dramatiq.brokers.redis import RedisBroker


redis_broker = RedisBroker(middleware=[AsyncIO()])
dramatiq.set_broker(redis_broker)


@actor
async def count_words(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code == 200:
            text = response.text
            count = len(text.split(" "))
            print(f"There are {count} words at {url!r}.")
        else:
            print(f"Failed to fetch {url!r}. Status code: {response.status_code}")

@ArtemIsmagilov
Copy link

Your code use sync pika. Does it make sense to use aio-pika https://github.com/mosquito/aio-pika ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants