diff --git a/fastapi/fastapi_dispatcher.py b/fastapi/fastapi_dispatcher.py index 1a8eb3532..3f2390d4d 100644 --- a/fastapi/fastapi_dispatcher.py +++ b/fastapi/fastapi_dispatcher.py @@ -8,6 +8,7 @@ from .context import odoo_env_ctx from .error_handlers import convert_exception_to_status_body +from .pools import fastapi_app_pool class FastApiDispatcher(Dispatcher): @@ -24,18 +25,17 @@ def dispatch(self, endpoint, args): root_path = "/" + environ["PATH_INFO"].split("/")[1] # TODO store the env into contextvar to be used by the odoo_env # depends method - fastapi_endpoint = self.request.env["fastapi.endpoint"].sudo() - app = fastapi_endpoint.get_app(root_path) - uid = fastapi_endpoint.get_uid(root_path) - data = BytesIO() - with self._manage_odoo_env(uid): - for r in app(environ, self._make_response): - data.write(r) - if self.inner_exception: - raise self.inner_exception - return self.request.make_response( - data.getvalue(), headers=self.headers, status=self.status - ) + with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app: + uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path) + data = BytesIO() + with self._manage_odoo_env(uid): + for r in app(environ, self._make_response): + data.write(r) + if self.inner_exception: + raise self.inner_exception + return self.request.make_response( + data.getvalue(), headers=self.headers, status=self.status + ) def handle_error(self, exc): headers = getattr(exc, "headers", None) diff --git a/fastapi/middleware.py b/fastapi/middleware.py new file mode 100644 index 000000000..e09b22309 --- /dev/null +++ b/fastapi/middleware.py @@ -0,0 +1,26 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). +""" +ASGI middleware for FastAPI. + +This module provides an ASGI middleware for FastAPI applications. The middleware +is designed to ensure managed the lifecycle of the threads used to as event loop +for the ASGI application. + +""" + +from collections.abc import Iterable + +import a2wsgi +from a2wsgi.asgi import ASGIResponder +from a2wsgi.wsgi_typing import Environ, StartResponse + +from .pools import event_loop_pool + + +class ASGIMiddleware(a2wsgi.ASGIMiddleware): + def __call__( + self, environ: Environ, start_response: StartResponse + ) -> Iterable[bytes]: + with event_loop_pool.get_event_loop() as loop: + return ASGIResponder(self.app, loop)(environ, start_response) diff --git a/fastapi/models/fastapi_endpoint.py b/fastapi/models/fastapi_endpoint.py index b77bf5141..a884cee8a 100644 --- a/fastapi/models/fastapi_endpoint.py +++ b/fastapi/models/fastapi_endpoint.py @@ -7,7 +7,6 @@ from itertools import chain from typing import Any -from a2wsgi import ASGIMiddleware from starlette.middleware import Middleware from starlette.routing import Mount @@ -16,6 +15,7 @@ from fastapi import APIRouter, Depends, FastAPI from .. import dependencies +from ..middleware import ASGIMiddleware _logger = logging.getLogger(__name__) @@ -121,10 +121,10 @@ def _registered_endpoint_rule_keys(self): return tuple(res) @api.model - def _routing_impacting_fields(self) -> tuple[str]: + def _routing_impacting_fields(self) -> tuple[str, ...]: """The list of fields requiring to refresh the mount point of the pp into odoo if modified""" - return ("root_path",) + return ("root_path", "save_http_session") # # end of endpoint.route.sync.mixin methods implementation @@ -199,14 +199,19 @@ def _endpoint_registry_route_unique_key(self, routing: dict[str, Any]): return f"{self._name}:{self.id}:{path}" def _reset_app(self): - self.env.registry.clear_cache() + self._reset_app_cache_marker.clear_cache(self) + + @tools.ormcache() + def _reset_app_cache_marker(self): + """This methos is used to get a way to mark the orm cache as dirty + when the app is reset. By marking the cache as dirty, the system + will signal to others instances that the cache is not up to date + and that they should invalidate their cache as well. This is required + to ensure that any change requiring a reset of the app is propagated + to all the running instances. + """ @api.model - @tools.ormcache("root_path") - # TODO cache on thread local by db to enable to get 1 middelware by - # thread when odoo runs in multi threads mode and to allows invalidate - # specific entries in place og the overall cache as we have to do into - # the _rest_app method def get_app(self, root_path): record = self.search([("root_path", "=", root_path)]) if not record: diff --git a/fastapi/pools/__init__.py b/fastapi/pools/__init__.py new file mode 100644 index 000000000..31f1fb388 --- /dev/null +++ b/fastapi/pools/__init__.py @@ -0,0 +1,11 @@ +from .event_loop import EventLoopPool +from .fastapi_app import FastApiAppPool +from odoo.service.server import CommonServer + +event_loop_pool = EventLoopPool() +fastapi_app_pool = FastApiAppPool() + + +CommonServer.on_stop(event_loop_pool.shutdown) + +__all__ = ["event_loop_pool", "fastapi_app_pool"] diff --git a/fastapi/pools/event_loop.py b/fastapi/pools/event_loop.py new file mode 100644 index 000000000..9f3a0160e --- /dev/null +++ b/fastapi/pools/event_loop.py @@ -0,0 +1,61 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). + +import asyncio +import queue +import threading +from collections.abc import Generator +from contextlib import contextmanager + + +class EventLoopPool: + def __init__(self): + self.pool = queue.Queue[tuple[asyncio.AbstractEventLoop, threading.Thread]]() + + def __get_event_loop_and_thread( + self, + ) -> tuple[asyncio.AbstractEventLoop, threading.Thread]: + """ + Get an event loop from the pool. If no event loop is available, + create a new one. + """ + try: + return self.pool.get_nowait() + except queue.Empty: + loop = asyncio.new_event_loop() + thread = threading.Thread(target=loop.run_forever, daemon=True) + thread.start() + return loop, thread + + def __return_event_loop( + self, loop: asyncio.AbstractEventLoop, thread: threading.Thread + ) -> None: + """ + Return an event loop to the pool for reuse. + """ + self.pool.put((loop, thread)) + + def shutdown(self): + """ + Shutdown all event loop threads in the pool. + """ + while not self.pool.empty(): + loop, thread = self.pool.get_nowait() + loop.call_soon_threadsafe(loop.stop) + thread.join() + loop.close() + + @contextmanager + def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]: + """ + Get an event loop from the pool. If no event loop is available, + create a new one. + + After the context manager exits, the event loop is returned to + the pool for reuse. + """ + loop, thread = self.__get_event_loop_and_thread() + try: + yield loop + finally: + self.__return_event_loop(loop, thread) diff --git a/fastapi/pools/fastapi_app.py b/fastapi/pools/fastapi_app.py new file mode 100644 index 000000000..67bfafb75 --- /dev/null +++ b/fastapi/pools/fastapi_app.py @@ -0,0 +1,136 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). +import logging +import queue +import threading +from collections import defaultdict +from collections.abc import Generator +from contextlib import contextmanager + +from odoo.api import Environment + +from fastapi import FastAPI + +_logger = logging.getLogger(__name__) + + +class FastApiAppPool: + """Pool of FastAPI apps. + + This class manages a pool of FastAPI apps. The pool is organized by database name + and root path. Each pool is a queue of FastAPI apps. + + The pool is used to reuse FastAPI apps across multiple requests. This is useful + to avoid the overhead of creating a new FastAPI app for each request. The pool + ensures that only one request at a time uses an app. + + The proper way to use the pool is to use the get_app method as a context manager. + This ensures that the app is returned to the pool after the context manager exits. + The get_app method is designed to ensure that the app made available to the + caller is unique and not used by another caller at the same time. + + .. code-block:: python + + with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app: + # use the app + + The pool is invalidated when the cache registry is updated. This ensures that + the pool is always up-to-date with the latest app configuration. It also + ensures that the invalidation is done even in the case of a modification occurring + in a different worker process or thread or server instance. This mechanism + works because every time an attribute of the fastapi.endpoint model is modified + and this attribute is part of the list returned by the `_fastapi_app_fields`, + or `_routing_impacting_fields` methods, we reset the cache of a marker method + `_reset_app_cache_marker`. As side effect, the cache registry is marked to be + updated by the increment of the `cache_sequence` SQL sequence. This cache sequence + on the registry is reloaded from the DB on each request made to a specific database. + When an app is retrieved from the pool, we always compare the cache sequence of + the pool with the cache sequence of the registry. If the two sequences are + different, we invalidate the pool and save the new cache sequence on the pool. + + The cache is based on a defaultdict of defaultdict of queue.Queue. We are cautious + that the use of defaultdict is not thread-safe for operations that modify the + dictionary. However the only operation that modifies the dictionary is the + first access to a new key. If two threads access the same key at the same time, + the two threads will create two different queues. This is not a problem since + at the time of returning an app to the pool, we are sure that a queue exists + for the key into the cache and all the created apps are returned to the same + valid queue. And the end, the lack of thread-safety for the defaultdict could + only lead to a negligible overhead of creating a new queue that will never be + used. This is why we consider that the use of defaultdict is safe in this context. + """ + + def __init__(self): + self._queue_by_db_by_root_path: dict[str, dict[str, queue.Queue[FastAPI]]] = ( + defaultdict(lambda: defaultdict(queue.Queue)) + ) + self.__cache_sequences = {} + self._lock = threading.Lock() + + def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]: + db_name = env.cr.dbname + return self._queue_by_db_by_root_path[db_name][root_path] + + def __get_app(self, env: Environment, root_path: str) -> FastAPI: + pool = self.__get_pool(env, root_path) + try: + return pool.get_nowait() + except queue.Empty: + env["fastapi.endpoint"].sudo() + return env["fastapi.endpoint"].sudo().get_app(root_path) + + def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None: + pool = self.__get_pool(env, root_path) + pool.put(app) + + @contextmanager + def get_app( + self, env: Environment, root_path: str + ) -> Generator[FastAPI, None, None]: + """Return a FastAPI app to be used in a context manager. + + The app is retrieved from the pool if available, otherwise a new one is created. + The app is returned to the pool after the context manager exits. + + When used into the FastApiDispatcher class this ensures that the app is reused + across multiple requests but only one request at a time uses an app. + """ + self._check_cache(env) + app = self.__get_app(env, root_path) + try: + yield app + finally: + self.__return_app(env, app, root_path) + + def get_cache_sequence(self, key: str) -> int: + with self._lock: + return self.__cache_sequences.get(key, 0) + + def set_cache_sequence(self, key: str, value: int) -> None: + with self._lock: + if ( + key not in self.__cache_sequences + or value != self.__cache_sequences[key] + ): + self.__cache_sequences[key] = value + + def _check_cache(self, env: Environment) -> None: + cache_sequences = env.registry.cache_sequences + for key, value in cache_sequences.items(): + if ( + value != self.get_cache_sequence(key) + and self.get_cache_sequence(key) != 0 + ): + _logger.info( + "Cache registry updated, reset fastapi_app pool for the current " + "database" + ) + self.invalidate(env) + self.set_cache_sequence(key, value) + + def invalidate(self, env: Environment, root_path: str | None = None) -> None: + db_name = env.cr.dbname + if root_path: + self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() + elif db_name in self._queue_by_db_by_root_path: + del self._queue_by_db_by_root_path[db_name]