From 80e74129450b762ff61299597b76042ff70a7049 Mon Sep 17 00:00:00 2001 From: sehat1137 <edox1j2n@duck.com> Date: Mon, 25 Nov 2024 01:45:57 +0300 Subject: [PATCH 1/5] fix: #1874 support workers for ASGI FastStream - Add support workers via gunicorn - Add support fd & unix socket - Add new external params `--bind` - Separate `--bind` params from app --- docs/docs/en/getting-started/asgi.md | 32 ++++++++++++ faststream/asgi/app.py | 74 +++++++++++++++++++++------- faststream/cli/utils/parser.py | 13 ++++- tests/cli/utils/test_parser.py | 18 ++++--- 4 files changed, 110 insertions(+), 27 deletions(-) diff --git a/docs/docs/en/getting-started/asgi.md b/docs/docs/en/getting-started/asgi.md index 008c56741b..90de5980df 100644 --- a/docs/docs/en/getting-started/asgi.md +++ b/docs/docs/en/getting-started/asgi.md @@ -38,6 +38,22 @@ uvicorn main:app It does nothing but launch the app itself as an **ASGI lifespan**. +!!! note + If you want to run your app using several workers, you need to use something else than `uvicorn`. + ```shell + faststream run main:app --workers 4 + ``` + ```shell + gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4 + ``` + ```shell + granian --interface asgi main:app --workers 4 + ``` + ```shell + hypercorn main:app --workers 4 + ``` + + ### ASGI Routes It doesn't look very helpful, so let's add some **HTTP** endpoints. @@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi( ```shell faststream run main:app --host 0.0.0.0 --port 8000 --workers 4 ``` + This possibility builded on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI. + We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py). ## Other ASGI Compatibility @@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker) app.mount("/health", make_ping_asgi(broker, timeout=5.0)) app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker))) ``` + +!!! tip + You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default + + ```shell + faststream run main:app --bind unix:/tmp/socket.sock + ``` + ```shell + faststream run main:app --bind fd://2 + ``` + You can use multiple binds if you want + ```shell + faststream run main:app --bind 0.0.0.0:8000 '[::]:8000' + ``` diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index a031021ad0..fec75fceeb 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -6,6 +6,7 @@ Any, AsyncIterator, Dict, + List, Optional, Sequence, Tuple, @@ -146,25 +147,62 @@ async def run( run_extra_options: Optional[Dict[str, "SettingField"]] = None, sleep_time: float = 0.1, ) -> None: - import uvicorn - - if not run_extra_options: - run_extra_options = {} - port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type] - workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type] - host = str(run_extra_options.pop("host", "localhost")) - fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type] - config = uvicorn.Config( - self, - host=host, - port=port, - log_level=log_level, - workers=workers, - fd=fd if fd != -1 else None, - **run_extra_options, + try: + import uvicorn # noqa: F401 + from gunicorn.app.base import BaseApplication + except ImportError as e: + raise RuntimeError( + "You need uvicorn and gunicorn to run FastStream ASGI App via CLI" + ) from e + + def load_config(_self: BaseApplication) -> None: + for k, v in _self.options.items(): + if k in _self.cfg.settings and v is not None: + _self.cfg.set(k.lower(), v) + else: + logger.warning(f"Unknown config variable: {k} with value {v}") + + ASGIRunner = type( # noqa: N806 + "ASGIRunner", + (BaseApplication,), + { + "load_config": load_config, + "load": lambda _self: _self.asgi_app, + }, ) - server = uvicorn.Server(config) - await server.serve() + + def init( + _self: ASGIRunner, asgi_app: "ASGIApp", options: Dict[str, Any] + ) -> None: # type: ignore[valid-type] + _self.options = options # type: ignore[attr-defined] + _self.asgi_app = asgi_app # type: ignore[attr-defined] + super(ASGIRunner, _self).__init__() # type: ignore[arg-type] + + ASGIRunner.__init__ = init # type: ignore[misc] + + run_extra_options = run_extra_options or {} + + bindings: List[str] = [] + host = run_extra_options.pop("host", None) + port = run_extra_options.pop("port", None) + if host is not None and port is not None: + bindings.append(f"{host}:{port}") + elif host is not None: + bindings.append(f"{host}:8000") + elif port is not None: + bindings.append(f"127.0.0.1:{port}") + + bind = run_extra_options.get("bind") + if isinstance(bind, list): + bindings.extend(bind) # type: ignore + elif isinstance(bind, str): + bindings.append(bind) + + run_extra_options["bind"] = bindings or "127.0.0.1:8000" + # We use gunicorn with uvicorn workers because uvicorn don't support multiple workers + run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker" + + ASGIRunner(self, run_extra_options).run() @asynccontextmanager async def start_lifespan_context(self) -> AsyncIterator[None]: diff --git a/faststream/cli/utils/parser.py b/faststream/cli/utils/parser.py index 00c904d774..f36c54935c 100644 --- a/faststream/cli/utils/parser.py +++ b/faststream/cli/utils/parser.py @@ -1,3 +1,4 @@ +import re from functools import reduce from typing import TYPE_CHECKING, Dict, List, Tuple @@ -5,6 +6,16 @@ from faststream.types import SettingField +def is_bind_arg(arg: str) -> bool: + """Determine whether the received argument refers to --bind. + + bind arguments are like: 0.0.0.0:8000, [::]:8000, fd://2, /tmp/socket.sock + + """ + bind_regex = re.compile(r":\d+$|:/+\d|:/[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+") + return bool(bind_regex.search(arg)) + + def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]: """Parses command line arguments.""" extra_kwargs: Dict[str, SettingField] = {} @@ -22,7 +33,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]: ), "-", ]: - if ":" in item: + if ":" in item and not is_bind_arg(item): app = item else: diff --git a/tests/cli/utils/test_parser.py b/tests/cli/utils/test_parser.py index 91ace3770e..11c935468a 100644 --- a/tests/cli/utils/test_parser.py +++ b/tests/cli/utils/test_parser.py @@ -23,19 +23,20 @@ ) ARG6 = ("--some-key",) ARG7 = ("--k7", "1", "2", "--k7", "3") +ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2") @pytest.mark.parametrize( "args", ( # noqa: PT007 - (APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION), + (APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION), ), ) def test_custom_argument_parsing(args: Tuple[str]): @@ -49,4 +50,5 @@ def test_custom_argument_parsing(args: Tuple[str]): "k5": ["1", "1"], "some_key": True, "k7": ["1", "2", "3"], + "bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"], } From 01960d8e9e443d4d48f0685c1e09b5b386510155 Mon Sep 17 00:00:00 2001 From: sehat1137 <edox1j2n@duck.com> Date: Mon, 25 Nov 2024 02:00:01 +0300 Subject: [PATCH 2/5] test: extend test for CLI --- docs/docs/en/getting-started/asgi.md | 6 +++--- faststream/asgi/app.py | 6 ++++-- tests/cli/utils/test_parser.py | 16 +++++++++++++++- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/docs/docs/en/getting-started/asgi.md b/docs/docs/en/getting-started/asgi.md index 90de5980df..18cac04dd3 100644 --- a/docs/docs/en/getting-started/asgi.md +++ b/docs/docs/en/getting-started/asgi.md @@ -44,7 +44,7 @@ It does nothing but launch the app itself as an **ASGI lifespan**. faststream run main:app --workers 4 ``` ```shell - gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4 + gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4 ``` ```shell granian --interface asgi main:app --workers 4 @@ -186,8 +186,8 @@ app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker))) ``` !!! tip - You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default - + You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default + ```shell faststream run main:app --bind unix:/tmp/socket.sock ``` diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index fec75fceeb..f7f288927b 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -172,8 +172,10 @@ def load_config(_self: BaseApplication) -> None: ) def init( - _self: ASGIRunner, asgi_app: "ASGIApp", options: Dict[str, Any] - ) -> None: # type: ignore[valid-type] + _self: ASGIRunner, # type: ignore[valid-type] + asgi_app: "ASGIApp", + options: Dict[str, Any], + ) -> None: _self.options = options # type: ignore[attr-defined] _self.asgi_app = asgi_app # type: ignore[attr-defined] super(ASGIRunner, _self).__init__() # type: ignore[arg-type] diff --git a/tests/cli/utils/test_parser.py b/tests/cli/utils/test_parser.py index 11c935468a..c6bc939c01 100644 --- a/tests/cli/utils/test_parser.py +++ b/tests/cli/utils/test_parser.py @@ -2,7 +2,7 @@ import pytest -from faststream.cli.utils.parser import parse_cli_args +from faststream.cli.utils.parser import is_bind_arg, parse_cli_args APPLICATION = "module:app" @@ -52,3 +52,17 @@ def test_custom_argument_parsing(args: Tuple[str]): "k7": ["1", "2", "3"], "bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"], } + + +@pytest.mark.parametrize( + "args", ["0.0.0.0:8000", "[::]:8000", "fd://2", "unix:/tmp/socket.sock"] +) +def test_bind_arg(args: str): + assert is_bind_arg(args) is True + + +@pytest.mark.parametrize( + "args", ["main:app", "src.main:app", "examples.nats.e01_basic:app2"] +) +def test_not_bind_arg(args: str): + assert is_bind_arg(args) is False From 90c212879978ba994a2a8bf37563ebde9114e466 Mon Sep 17 00:00:00 2001 From: Sehat1137 <Sehat1137@users.noreply.github.com> Date: Sun, 24 Nov 2024 23:04:02 +0000 Subject: [PATCH 3/5] docs: generate API References --- docs/docs/SUMMARY.md | 1 + .../en/api/faststream/cli/utils/parser/is_bind_arg.md | 11 +++++++++++ 2 files changed, 12 insertions(+) create mode 100644 docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 596d8e0091..dd3f9c90ca 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -470,6 +470,7 @@ search: - [get_log_level](api/faststream/cli/utils/logs/get_log_level.md) - [set_log_level](api/faststream/cli/utils/logs/set_log_level.md) - parser + - [is_bind_arg](api/faststream/cli/utils/parser/is_bind_arg.md) - [parse_cli_args](api/faststream/cli/utils/parser/parse_cli_args.md) - [remove_prefix](api/faststream/cli/utils/parser/remove_prefix.md) - confluent diff --git a/docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md b/docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md new file mode 100644 index 0000000000..133a1d5675 --- /dev/null +++ b/docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.utils.parser.is_bind_arg From 3ef5227d7e43ac6c9779fd552b68d381a4a33551 Mon Sep 17 00:00:00 2001 From: sehat1137 <edox1j2n@duck.com> Date: Thu, 28 Nov 2024 23:33:48 +0300 Subject: [PATCH 4/5] fix: change creation of ASGIRunner --- faststream/asgi/app.py | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index f7f288927b..db9eeee27a 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -152,35 +152,22 @@ async def run( from gunicorn.app.base import BaseApplication except ImportError as e: raise RuntimeError( - "You need uvicorn and gunicorn to run FastStream ASGI App via CLI" + "You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn" ) from e - def load_config(_self: BaseApplication) -> None: - for k, v in _self.options.items(): - if k in _self.cfg.settings and v is not None: - _self.cfg.set(k.lower(), v) - else: - logger.warning(f"Unknown config variable: {k} with value {v}") + class ASGIRunner(BaseApplication): # type: ignore[misc] + def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None: + self.options = options + self.asgi_app = asgi_app + super().__init__() - ASGIRunner = type( # noqa: N806 - "ASGIRunner", - (BaseApplication,), - { - "load_config": load_config, - "load": lambda _self: _self.asgi_app, - }, - ) - - def init( - _self: ASGIRunner, # type: ignore[valid-type] - asgi_app: "ASGIApp", - options: Dict[str, Any], - ) -> None: - _self.options = options # type: ignore[attr-defined] - _self.asgi_app = asgi_app # type: ignore[attr-defined] - super(ASGIRunner, _self).__init__() # type: ignore[arg-type] + def load_config(self) -> None: + for k, v in self.options.items(): + if k in self.cfg.settings and v is not None: + self.cfg.set(k.lower(), v) - ASGIRunner.__init__ = init # type: ignore[misc] + def load(self) -> "ASGIApp": + return self.asgi_app run_extra_options = run_extra_options or {} @@ -204,7 +191,7 @@ def init( # We use gunicorn with uvicorn workers because uvicorn don't support multiple workers run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker" - ASGIRunner(self, run_extra_options).run() + ASGIRunner(run_extra_options, self).run() @asynccontextmanager async def start_lifespan_context(self) -> AsyncIterator[None]: From f90a13c0f974375cbe5a0d5214117b65b3659666 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov <diementros@yandex.ru> Date: Fri, 29 Nov 2024 07:30:16 +0300 Subject: [PATCH 5/5] lint: fix CI --- docs/docs/en/getting-started/asgi.md | 2 +- faststream/broker/fastapi/get_dependant.py | 4 ++-- faststream/rabbit/broker/broker.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/docs/en/getting-started/asgi.md b/docs/docs/en/getting-started/asgi.md index 18cac04dd3..2ece28bd13 100644 --- a/docs/docs/en/getting-started/asgi.md +++ b/docs/docs/en/getting-started/asgi.md @@ -153,7 +153,7 @@ app = FastStream(broker).as_asgi( ```shell faststream run main:app --host 0.0.0.0 --port 8000 --workers 4 ``` - This possibility builded on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI. + This possibility built on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI. We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py). ## Other ASGI Compatibility diff --git a/faststream/broker/fastapi/get_dependant.py b/faststream/broker/fastapi/get_dependant.py index 663812bda1..1f43659e1a 100644 --- a/faststream/broker/fastapi/get_dependant.py +++ b/faststream/broker/fastapi/get_dependant.py @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant": lambda x: isinstance(x, FieldInfo), p.field_info.metadata or (), ), - Field(**field_data), + Field(**field_data), # type: ignore[pydantic-field,unused-ignore] ) else: @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant": "le": info.field_info.le, } ) - f = Field(**field_data) + f = Field(**field_data) # type: ignore[pydantic-field,unused-ignore] params_unique[p.name] = ( info.annotation, diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 01ae9d6b92..38daf8336d 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -244,10 +244,10 @@ def __init__( asyncapi_url = str(amqp_url) # respect ascynapi_url argument scheme - builded_asyncapi_url = urlparse(asyncapi_url) - self.virtual_host = builded_asyncapi_url.path + built_asyncapi_url = urlparse(asyncapi_url) + self.virtual_host = built_asyncapi_url.path if protocol is None: - protocol = builded_asyncapi_url.scheme + protocol = built_asyncapi_url.scheme super().__init__( url=str(amqp_url), @@ -268,7 +268,7 @@ def __init__( # AsyncAPI args description=description, asyncapi_url=asyncapi_url, - protocol=protocol or builded_asyncapi_url.scheme, + protocol=protocol or built_asyncapi_url.scheme, protocol_version=protocol_version, security=security, tags=tags,