Skip to content

Commit

Permalink
fix: Redis connections leak
Browse files Browse the repository at this point in the history
  • Loading branch information
nidemidovich committed Oct 26, 2023
1 parent cebf3e7 commit 699f3df
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 34 deletions.
58 changes: 24 additions & 34 deletions app/caching/callback_redis_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,35 @@

import asyncio
import pickle # noqa: S403
from typing import Dict, Optional
from typing import Any, Dict, Optional
from uuid import UUID

from pybotx import CallbackNotReceivedError, CallbackRepoProto
from pybotx.bot.exceptions import BotShuttingDownError, BotXMethodCallbackNotFoundError
from pybotx.models.method_callbacks import BotXMethodCallback
from redis import asyncio as aioredis
from redis.asyncio.client import Redis


class CallbackRedisRepo(CallbackRepoProto):
def __init__(
self,
redis: aioredis.Redis,
redis: Redis,
prefix: Optional[str] = None,
):
self._redis = redis
self._prefix = prefix or ""
self._pubsubs: Dict[UUID, aioredis.client.PubSub] = {}
self._futures: Dict[UUID, asyncio.Future] = {}
self._futures: Dict[UUID, asyncio.Future[BotXMethodCallback]] = {}

self.pubsub = redis.pubsub()

async def create_botx_method_callback(
self,
sync_id: UUID,
) -> None:
pubsub = self._redis.pubsub()
await pubsub.subscribe(f"{self._prefix}:{sync_id}")
self._futures[sync_id] = asyncio.Future()
self._pubsubs[sync_id] = pubsub
await self.pubsub.subscribe(
**{f"{self._prefix}:{sync_id}": self._message_handler}
)

async def set_botx_method_callback_result(
self,
Expand All @@ -47,31 +48,24 @@ async def wait_botx_method_callback(
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
channel = self._get_pubsub(sync_id)
try:
callback = await asyncio.wait_for(
self._get_callback(channel), timeout=timeout
)
callback = await asyncio.wait_for(self._futures[sync_id], timeout=timeout)
except asyncio.TimeoutError:
raise CallbackNotReceivedError(sync_id) from None

future = self._futures[sync_id]
if future.done():
future.result()
else:
future.set_result(callback)
finally:
await self.pop_botx_method_callback(sync_id)

return callback

async def pop_botx_method_callback(
self,
sync_id: UUID,
) -> "asyncio.Future[BotXMethodCallback]":
return self._futures[sync_id]
await self.pubsub.unsubscribe(f"{self._prefix}:{sync_id}")
return self._futures.pop(sync_id)

async def stop_callbacks_waiting(self) -> None:
for pubsub in self._pubsubs.values():
await pubsub.unsubscribe()
await self.pubsub.unsubscribe()

for sync_id, future in self._futures.items():
if not future.done():
Expand All @@ -81,17 +75,13 @@ async def stop_callbacks_waiting(self) -> None:
),
)

def _get_pubsub(self, sync_id: UUID) -> aioredis.client.PubSub:
try:
return self._pubsubs[sync_id]
except KeyError:
raise BotXMethodCallbackNotFoundError(sync_id) from None
async def _message_handler(self, message: Any) -> None:
if message["type"] == "message":
callback: BotXMethodCallback = pickle.loads(message["data"]) # noqa: S301

@classmethod
async def _get_callback( # type: ignore
cls,
channel: aioredis.client.PubSub,
) -> BotXMethodCallback:
async for message in channel.listen():
if message["type"] == "message":
return pickle.loads(message["data"]) # noqa: S301
future = self._futures[callback.sync_id]

if future.done():
future.result()
else:
future.set_result(callback)
9 changes: 9 additions & 0 deletions app/caching/exception_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Redis custom exception handlers."""

from redis.asyncio.client import PubSub

from app.logger import logger


async def pubsub_exception_handler(exc: BaseException, pubsub: PubSub) -> None:
logger.exception("Something went wrong in PubSub")
14 changes: 14 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Application with configuration for events, routers and middleware."""

import asyncio
from functools import partial

from fastapi import FastAPI
Expand All @@ -9,6 +10,7 @@
from app.api.routers import router
from app.bot.bot import get_bot
from app.caching.callback_redis_repo import CallbackRedisRepo
from app.caching.exception_handlers import pubsub_exception_handler
from app.caching.redis_repo import RedisRepo
from app.db.sqlalchemy import build_db_session_factory, close_db_connections
from app.resources import strings
Expand All @@ -21,10 +23,18 @@ async def startup(application: FastAPI, raise_bot_exceptions: bool) -> None:

# -- Redis --
redis_client = aioredis.from_url(settings.REDIS_DSN)
pool = aioredis.BlockingConnectionPool(
max_connections=settings.CONNECTION_POOL_SIZE,
**redis_client.connection_pool.connection_kwargs,
)
redis_client.connection_pool = pool
redis_repo = RedisRepo(redis=redis_client, prefix=strings.BOT_PROJECT_NAME)

# -- Bot --
callback_repo = CallbackRedisRepo(redis_client)
process_callbacks_task = asyncio.create_task(
callback_repo.pubsub.run(exception_handler=pubsub_exception_handler)
)
bot = get_bot(callback_repo, raise_exceptions=raise_bot_exceptions)

await bot.startup()
Expand All @@ -34,12 +44,16 @@ async def startup(application: FastAPI, raise_bot_exceptions: bool) -> None:

application.state.bot = bot
application.state.redis = redis_client
application.state.process_callbacks_task = process_callbacks_task


async def shutdown(application: FastAPI) -> None:
# -- Bot --
bot: Bot = application.state.bot
await bot.shutdown()
process_callbacks_task: asyncio.Task = application.state.process_callbacks_task
process_callbacks_task.cancel()
await asyncio.gather(process_callbacks_task, return_exceptions=True)

# -- Redis --
redis_client: aioredis.Redis = application.state.redis
Expand Down
1 change: 1 addition & 0 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _build_credentials_from_string(

# redis
REDIS_DSN: str
CONNECTION_POOL_SIZE: int = 10

# healthcheck
WORKER_TIMEOUT_SEC: float = 4
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ per-file-ignores =
# too many imports
app/bot/commands/*.py:WPS201,D104
app/services/botx_user_search.py:WPS232
app/main.py:WPS201
# line too long
app/resources/strings.py:E501
tests/*:D100,WPS110,WPS116,WPS118,WPS201,WPS204,WPS235,WPS430,WPS442,WPS432
Expand Down

0 comments on commit 699f3df

Please sign in to comment.