Skip to content

Commit

Permalink
fix: airtai#1874 support workers for ASGI FastStream
Browse files Browse the repository at this point in the history
  • Loading branch information
sehat1137 committed Nov 24, 2024
1 parent 9bc7a05 commit 26aca1a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 28 deletions.
32 changes: 32 additions & 0 deletions docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ uvicorn main:app

It does nothing but launch the app itself as an **ASGI lifespan**.

!!! note
If you want to run your app using several workers, you need to use something else than `uvicorn`.
```shell
faststream run main:app --workers 4
```
```shell
gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4
```
```shell
granian --interface asgi main:app --workers 4
```
```shell
hypercorn main:app --workers 4
```


### ASGI Routes

It doesn't look very helpful, so let's add some **HTTP** endpoints.
Expand Down Expand Up @@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi(
```shell
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
```
This possibility builded on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI.
We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py).

## Other ASGI Compatibility

Expand Down Expand Up @@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker)
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))
```

!!! tip
You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default

```shell
faststream run main:app --bind unix:/tmp/socket.sock
```
```shell
faststream run main:app --bind fd://2
```
You can use multiple binds if you want
```shell
faststream run main:app --bind 0.0.0.0:8000 '[::]:8000'
```
76 changes: 57 additions & 19 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Sequence,
Tuple,
Union,
List,
)

import anyio
Expand All @@ -20,6 +21,17 @@
from faststream.asgi.websocket import WebSocketClose
from faststream.log.logging import logger

try:
from gunicorn.app.base import BaseApplication
except ImportError:
BaseApplication = None

try:
import uvicorn
except ImportError:
uvicorn = None # type: ignore


if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
from faststream.asyncapi.schema import (
Expand All @@ -43,6 +55,23 @@
)


class ASGIRunner(BaseApplication): # type: ignore
def __init__(self, asgi_app: "ASGIApp", options: Dict[str, Any]):
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)
else:
logger.warn(f"Unknown config variable: {k} with value {v}")

def load(self) -> "ASGIApp":
return self.asgi_app


class AsgiFastStream(Application):
def __init__(
self,
Expand Down Expand Up @@ -146,25 +175,34 @@ async def run(
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
) -> None:
import uvicorn

if not run_extra_options:
run_extra_options = {}
port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type]
workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type]
host = str(run_extra_options.pop("host", "localhost"))
fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type]
config = uvicorn.Config(
self,
host=host,
port=port,
log_level=log_level,
workers=workers,
fd=fd if fd != -1 else None,
**run_extra_options,
)
server = uvicorn.Server(config)
await server.serve()
if not all([uvicorn, BaseApplication]):
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI"
)

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(self, run_extra_options).run()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
5 changes: 4 additions & 1 deletion faststream/cli/utils/parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import re
from functools import reduce
from typing import TYPE_CHECKING, Dict, List, Tuple

if TYPE_CHECKING:
from faststream.types import SettingField

APP_REGEX = re.compile(r"[a-zA-Z]+:[a-zA-Z]+")


def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
"""Parses command line arguments."""
Expand All @@ -22,7 +25,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
),
"-",
]:
if ":" in item:
if re.match(APP_REGEX, item):
app = item

else:
Expand Down
18 changes: 10 additions & 8 deletions tests/cli/utils/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@
)
ARG6 = ("--some-key",)
ARG7 = ("--k7", "1", "2", "--k7", "3")
ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2")


@pytest.mark.parametrize(
"args",
( # noqa: PT007
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION),
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION),
),
)
def test_custom_argument_parsing(args: Tuple[str]):
Expand All @@ -49,4 +50,5 @@ def test_custom_argument_parsing(args: Tuple[str]):
"k5": ["1", "1"],
"some_key": True,
"k7": ["1", "2", "3"],
"bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"],
}

0 comments on commit 26aca1a

Please sign in to comment.