From 699f3df9ce6bbb445c9d3feec77ac17e425e8d68 Mon Sep 17 00:00:00 2001 From: Nikita Demidovich Date: Tue, 24 Oct 2023 14:07:54 +0300 Subject: [PATCH] fix: Redis connections leak --- app/caching/callback_redis_repo.py | 58 +++++++++++++----------------- app/caching/exception_handlers.py | 9 +++++ app/main.py | 14 ++++++++ app/settings.py | 1 + setup.cfg | 1 + 5 files changed, 49 insertions(+), 34 deletions(-) create mode 100644 app/caching/exception_handlers.py diff --git a/app/caching/callback_redis_repo.py b/app/caching/callback_redis_repo.py index b272bf2..2871a2d 100644 --- a/app/caching/callback_redis_repo.py +++ b/app/caching/callback_redis_repo.py @@ -2,34 +2,35 @@ import asyncio import pickle # noqa: S403 -from typing import Dict, Optional +from typing import Any, Dict, Optional from uuid import UUID from pybotx import CallbackNotReceivedError, CallbackRepoProto from pybotx.bot.exceptions import BotShuttingDownError, BotXMethodCallbackNotFoundError from pybotx.models.method_callbacks import BotXMethodCallback -from redis import asyncio as aioredis +from redis.asyncio.client import Redis class CallbackRedisRepo(CallbackRepoProto): def __init__( self, - redis: aioredis.Redis, + redis: Redis, prefix: Optional[str] = None, ): self._redis = redis self._prefix = prefix or "" - self._pubsubs: Dict[UUID, aioredis.client.PubSub] = {} - self._futures: Dict[UUID, asyncio.Future] = {} + self._futures: Dict[UUID, asyncio.Future[BotXMethodCallback]] = {} + + self.pubsub = redis.pubsub() async def create_botx_method_callback( self, sync_id: UUID, ) -> None: - pubsub = self._redis.pubsub() - await pubsub.subscribe(f"{self._prefix}:{sync_id}") self._futures[sync_id] = asyncio.Future() - self._pubsubs[sync_id] = pubsub + await self.pubsub.subscribe( + **{f"{self._prefix}:{sync_id}": self._message_handler} + ) async def set_botx_method_callback_result( self, @@ -47,19 +48,12 @@ async def wait_botx_method_callback( sync_id: UUID, timeout: float, ) -> BotXMethodCallback: - channel = self._get_pubsub(sync_id) try: - callback = await asyncio.wait_for( - self._get_callback(channel), timeout=timeout - ) + callback = await asyncio.wait_for(self._futures[sync_id], timeout=timeout) except asyncio.TimeoutError: raise CallbackNotReceivedError(sync_id) from None - - future = self._futures[sync_id] - if future.done(): - future.result() - else: - future.set_result(callback) + finally: + await self.pop_botx_method_callback(sync_id) return callback @@ -67,11 +61,11 @@ async def pop_botx_method_callback( self, sync_id: UUID, ) -> "asyncio.Future[BotXMethodCallback]": - return self._futures[sync_id] + await self.pubsub.unsubscribe(f"{self._prefix}:{sync_id}") + return self._futures.pop(sync_id) async def stop_callbacks_waiting(self) -> None: - for pubsub in self._pubsubs.values(): - await pubsub.unsubscribe() + await self.pubsub.unsubscribe() for sync_id, future in self._futures.items(): if not future.done(): @@ -81,17 +75,13 @@ async def stop_callbacks_waiting(self) -> None: ), ) - def _get_pubsub(self, sync_id: UUID) -> aioredis.client.PubSub: - try: - return self._pubsubs[sync_id] - except KeyError: - raise BotXMethodCallbackNotFoundError(sync_id) from None + async def _message_handler(self, message: Any) -> None: + if message["type"] == "message": + callback: BotXMethodCallback = pickle.loads(message["data"]) # noqa: S301 - @classmethod - async def _get_callback( # type: ignore - cls, - channel: aioredis.client.PubSub, - ) -> BotXMethodCallback: - async for message in channel.listen(): - if message["type"] == "message": - return pickle.loads(message["data"]) # noqa: S301 + future = self._futures[callback.sync_id] + + if future.done(): + future.result() + else: + future.set_result(callback) diff --git a/app/caching/exception_handlers.py b/app/caching/exception_handlers.py new file mode 100644 index 0000000..ea83b80 --- /dev/null +++ b/app/caching/exception_handlers.py @@ -0,0 +1,9 @@ +"""Redis custom exception handlers.""" + +from redis.asyncio.client import PubSub + +from app.logger import logger + + +async def pubsub_exception_handler(exc: BaseException, pubsub: PubSub) -> None: + logger.exception("Something went wrong in PubSub") diff --git a/app/main.py b/app/main.py index 8ddce35..7dcc79c 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ """Application with configuration for events, routers and middleware.""" +import asyncio from functools import partial from fastapi import FastAPI @@ -9,6 +10,7 @@ from app.api.routers import router from app.bot.bot import get_bot from app.caching.callback_redis_repo import CallbackRedisRepo +from app.caching.exception_handlers import pubsub_exception_handler from app.caching.redis_repo import RedisRepo from app.db.sqlalchemy import build_db_session_factory, close_db_connections from app.resources import strings @@ -21,10 +23,18 @@ async def startup(application: FastAPI, raise_bot_exceptions: bool) -> None: # -- Redis -- redis_client = aioredis.from_url(settings.REDIS_DSN) + pool = aioredis.BlockingConnectionPool( + max_connections=settings.CONNECTION_POOL_SIZE, + **redis_client.connection_pool.connection_kwargs, + ) + redis_client.connection_pool = pool redis_repo = RedisRepo(redis=redis_client, prefix=strings.BOT_PROJECT_NAME) # -- Bot -- callback_repo = CallbackRedisRepo(redis_client) + process_callbacks_task = asyncio.create_task( + callback_repo.pubsub.run(exception_handler=pubsub_exception_handler) + ) bot = get_bot(callback_repo, raise_exceptions=raise_bot_exceptions) await bot.startup() @@ -34,12 +44,16 @@ async def startup(application: FastAPI, raise_bot_exceptions: bool) -> None: application.state.bot = bot application.state.redis = redis_client + application.state.process_callbacks_task = process_callbacks_task async def shutdown(application: FastAPI) -> None: # -- Bot -- bot: Bot = application.state.bot await bot.shutdown() + process_callbacks_task: asyncio.Task = application.state.process_callbacks_task + process_callbacks_task.cancel() + await asyncio.gather(process_callbacks_task, return_exceptions=True) # -- Redis -- redis_client: aioredis.Redis = application.state.redis diff --git a/app/settings.py b/app/settings.py index 786050c..ad2f598 100644 --- a/app/settings.py +++ b/app/settings.py @@ -62,6 +62,7 @@ def _build_credentials_from_string( # redis REDIS_DSN: str + CONNECTION_POOL_SIZE: int = 10 # healthcheck WORKER_TIMEOUT_SEC: float = 4 diff --git a/setup.cfg b/setup.cfg index 5add41e..33db506 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,7 @@ per-file-ignores = # too many imports app/bot/commands/*.py:WPS201,D104 app/services/botx_user_search.py:WPS232 + app/main.py:WPS201 # line too long app/resources/strings.py:E501 tests/*:D100,WPS110,WPS116,WPS118,WPS201,WPS204,WPS235,WPS430,WPS442,WPS432