Skip to content

Commit

Permalink
Merge pull request #55 from DABND19/feature/redis-sentinel
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Jun 11, 2024
2 parents bb42e32 + 9900a0d commit d30f200
Show file tree
Hide file tree
Showing 9 changed files with 835 additions and 8 deletions.
21 changes: 21 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,24 @@ services:
REDIS_CLUSTER_CREATOR: "yes"
ports:
- 7001:6379

redis-master:
image: bitnami/redis:6.2.5
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s

redis-sentinel:
image: bitnami/redis-sentinel:latest
depends_on:
- redis-master
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_MASTER_HOST: "redis-master"
ports:
- 7002:26379
10 changes: 10 additions & 0 deletions taskiq_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
from taskiq_redis.redis_backend import (
RedisAsyncClusterResultBackend,
RedisAsyncResultBackend,
RedisAsyncSentinelResultBackend,
)
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker
from taskiq_redis.redis_sentinel_broker import (
ListQueueSentinelBroker,
PubSubSentinelBroker,
)
from taskiq_redis.schedule_source import (
RedisClusterScheduleSource,
RedisScheduleSource,
RedisSentinelScheduleSource,
)

__all__ = [
"RedisAsyncClusterResultBackend",
"RedisAsyncResultBackend",
"RedisAsyncSentinelResultBackend",
"ListQueueBroker",
"PubSubBroker",
"ListQueueClusterBroker",
"ListQueueSentinelBroker",
"PubSubSentinelBroker",
"RedisScheduleSource",
"RedisClusterScheduleSource",
"RedisSentinelScheduleSource",
]
167 changes: 165 additions & 2 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
import pickle
from typing import Any, Dict, Optional, TypeVar, Union
import sys
from contextlib import asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Dict,
List,
Optional,
Tuple,
TypeVar,
Union,
)

from redis.asyncio import BlockingConnectionPool, Redis
from redis.asyncio import BlockingConnectionPool, Redis, Sentinel
from redis.asyncio.cluster import RedisCluster
from taskiq import AsyncResultBackend
from taskiq.abc.result_backend import TaskiqResult
from taskiq.abc.serializer import TaskiqSerializer

from taskiq_redis.exceptions import (
DuplicateExpireTimeSelectedError,
ExpireTimeMustBeMoreThanZeroError,
ResultIsMissingError,
)
from taskiq_redis.serializer import PickleSerializer

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

if TYPE_CHECKING:
_Redis: TypeAlias = Redis[bytes]
else:
_Redis: TypeAlias = Redis

_ReturnType = TypeVar("_ReturnType")

Expand Down Expand Up @@ -267,3 +291,142 @@ async def get_result(
taskiq_result.log = None

return taskiq_result


class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):
"""Async result based on redis sentinel."""

def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
"""
Constructs a new result backend.
:param sentinels: list of sentinel host and ports pairs.
:param master_name: sentinel master name.
:param keep_results: flag to not remove results from Redis after reading.
:param result_ex_time: expire time in seconds for result.
:param result_px_time: expire time in milliseconds for result.
:param max_connection_pool_size: maximum number of connections in pool.
:param connection_kwargs: additional arguments for redis BlockingConnectionPool.
:raises DuplicateExpireTimeSelectedError: if result_ex_time
and result_px_time are selected.
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
and result_px_time are equal zero.
"""
self.sentinel = Sentinel(
sentinels=sentinels,
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)
self.master_name = master_name
if serializer is None:
serializer = PickleSerializer()
self.serializer = serializer
self.keep_results = keep_results
self.result_ex_time = result_ex_time
self.result_px_time = result_px_time

unavailable_conditions = any(
(
self.result_ex_time is not None and self.result_ex_time <= 0,
self.result_px_time is not None and self.result_px_time <= 0,
),
)
if unavailable_conditions:
raise ExpireTimeMustBeMoreThanZeroError(
"You must select one expire time param and it must be more than zero.",
)

if self.result_ex_time and self.result_px_time:
raise DuplicateExpireTimeSelectedError(
"Choose either result_ex_time or result_px_time.",
)

@asynccontextmanager
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
async with self.sentinel.master_for(self.master_name) as redis_conn:
yield redis_conn

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
"""
Sets task result in redis.
Dumps TaskiqResult instance into the bytes and writes
it to redis.
:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": task_id,
"value": self.serializer.dumpb(result),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

async with self._acquire_master_conn() as redis:
await redis.set(**redis_set_params) # type: ignore

async def is_result_ready(self, task_id: str) -> bool:
"""
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
"""
async with self._acquire_master_conn() as redis:
return bool(await redis.exists(task_id))

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
"""
Gets result from the task.
:param task_id: task's id.
:param with_logs: if True it will download task's logs.
:raises ResultIsMissingError: if there is no result when trying to get it.
:return: task's return value.
"""
async with self._acquire_master_conn() as redis:
if self.keep_results:
result_value = await redis.get(
name=task_id,
)
else:
result_value = await redis.getdel(
name=task_id,
)

if result_value is None:
raise ResultIsMissingError

taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
result_value,
)

if not with_logs:
taskiq_result.log = None

return taskiq_result
132 changes: 132 additions & 0 deletions taskiq_redis/redis_sentinel_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import sys
from contextlib import asynccontextmanager
from logging import getLogger
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
AsyncIterator,
Callable,
List,
Optional,
Tuple,
TypeVar,
)

from redis.asyncio import Redis, Sentinel
from taskiq import AsyncResultBackend, BrokerMessage
from taskiq.abc.broker import AsyncBroker

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

if TYPE_CHECKING:
_Redis: TypeAlias = Redis[bytes]
else:
_Redis: TypeAlias = Redis

_T = TypeVar("_T")

logger = getLogger("taskiq.redis_sentinel_broker")


class BaseSentinelBroker(AsyncBroker):
"""Base broker that works with Sentinel."""

def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
result_backend: Optional[AsyncResultBackend[_T]] = None,
task_id_generator: Optional[Callable[[], str]] = None,
queue_name: str = "taskiq",
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
**connection_kwargs: Any,
) -> None:
super().__init__(
result_backend=result_backend,
task_id_generator=task_id_generator,
)

self.sentinel = Sentinel(
sentinels=sentinels,
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)
self.master_name = master_name
self.queue_name = queue_name

@asynccontextmanager
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
async with self.sentinel.master_for(self.master_name) as redis_conn:
yield redis_conn


class PubSubSentinelBroker(BaseSentinelBroker):
"""Broker that works with Sentinel and broadcasts tasks to all workers."""

async def kick(self, message: BrokerMessage) -> None:
"""
Publish message over PUBSUB channel.
:param message: message to send.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.publish(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen redis queue for new messages.
This function listens to the pubsub channel
and yields all messages with proper types.
:yields: broker messages.
"""
async with self._acquire_master_conn() as redis_conn:
redis_pubsub_channel = redis_conn.pubsub()
await redis_pubsub_channel.subscribe(self.queue_name)
async for message in redis_pubsub_channel.listen():
if not message:
continue
if message["type"] != "message":
logger.debug("Received non-message from redis: %s", message)
continue
yield message["data"]


class ListQueueSentinelBroker(BaseSentinelBroker):
"""Broker that works with Sentinel and distributes tasks between workers."""

async def kick(self, message: BrokerMessage) -> None:
"""
Put a message in a list.
This method appends a message to the list of all messages.
:param message: message to append.
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with self._acquire_master_conn() as redis_conn:
await redis_conn.lpush(queue_name, message.message)

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Listen redis queue for new messages.
This function listens to the queue
and yields new messages if they have BrokerMessage type.
:yields: broker messages.
"""
redis_brpop_data_position = 1
async with self._acquire_master_conn() as redis_conn:
while True:
yield (await redis_conn.brpop(self.queue_name))[
redis_brpop_data_position
]
Loading

0 comments on commit d30f200

Please sign in to comment.