Skip to content

Commit

Permalink
Merge pull request #46 from stinovlas/add-cluster-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Nov 9, 2023
2 parents 204caa5 + ae672c5 commit 7386f46
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 15 deletions.
14 changes: 2 additions & 12 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,14 @@ jobs:
- name: Run lint check
run: poetry run pre-commit run -a ${{ matrix.cmd }}
pytest:
services:
redis:
image: bitnami/redis:6.2.5
env:
ALLOW_EMPTY_PASSWORD: "yes"
options: >-
--health-cmd="redis-cli ping"
--health-interval=5s
--health-timeout=5s
--health-retries=30
ports:
- 6379:6379
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v4
- name: Set up Redis instance and Redis cluster
run: docker-compose up -d
- name: Set up Python
uses: actions/setup-python@v2
with:
Expand Down
60 changes: 60 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3.2'

services:
redis:
image: bitnami/redis:6.2.5
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s
ports:
- 7000:6379
redis-node-0:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-1:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-2:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-3:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-4:
image: docker.io/bitnami/redis-cluster:7.2
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"

redis-node-5:
image: docker.io/bitnami/redis-cluster:7.2
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
environment:
ALLOW_EMPTY_PASSWORD: "yes"
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
REDIS_CLUSTER_REPLICAS: 1
REDIS_CLUSTER_CREATOR: "yes"
ports:
- 7001:6379
6 changes: 5 additions & 1 deletion taskiq_redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Package for redis integration."""
from taskiq_redis.redis_backend import RedisAsyncResultBackend
from taskiq_redis.redis_backend import (
RedisAsyncClusterResultBackend,
RedisAsyncResultBackend,
)
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
from taskiq_redis.schedule_source import RedisScheduleSource

__all__ = [
"RedisAsyncClusterResultBackend",
"RedisAsyncResultBackend",
"ListQueueBroker",
"PubSubBroker",
Expand Down
120 changes: 120 additions & 0 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, Optional, TypeVar, Union

from redis.asyncio import ConnectionPool, Redis
from redis.asyncio.cluster import RedisCluster
from taskiq import AsyncResultBackend
from taskiq.abc.result_backend import TaskiqResult

Expand Down Expand Up @@ -134,3 +135,122 @@ async def get_result(
taskiq_result.log = None

return taskiq_result


class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
"""Async result backend based on redis cluster."""

def __init__(
self,
redis_url: str,
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
) -> None:
"""
Constructs a new result backend.
:param redis_url: url to redis cluster.
:param keep_results: flag to not remove results from Redis after reading.
:param result_ex_time: expire time in seconds for result.
:param result_px_time: expire time in milliseconds for result.
:raises DuplicateExpireTimeSelectedError: if result_ex_time
and result_px_time are selected.
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
and result_px_time are equal zero.
"""
self.redis: RedisCluster[bytes] = RedisCluster.from_url(redis_url)
self.keep_results = keep_results
self.result_ex_time = result_ex_time
self.result_px_time = result_px_time

unavailable_conditions = any(
(
self.result_ex_time is not None and self.result_ex_time <= 0,
self.result_px_time is not None and self.result_px_time <= 0,
),
)
if unavailable_conditions:
raise ExpireTimeMustBeMoreThanZeroError(
"You must select one expire time param and it must be more than zero.",
)

if self.result_ex_time and self.result_px_time:
raise DuplicateExpireTimeSelectedError(
"Choose either result_ex_time or result_px_time.",
)

async def shutdown(self) -> None:
"""Closes redis connection."""
await self.redis.aclose() # type: ignore[attr-defined]
await super().shutdown()

async def set_result(
self,
task_id: str,
result: TaskiqResult[_ReturnType],
) -> None:
"""
Sets task result in redis.
Dumps TaskiqResult instance into the bytes and writes
it to redis.
:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": task_id,
"value": pickle.dumps(result),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

await self.redis.set(**redis_set_params) # type: ignore

async def is_result_ready(self, task_id: str) -> bool:
"""
Returns whether the result is ready.
:param task_id: ID of the task.
:returns: True if the result is ready else False.
"""
return bool(await self.redis.exists(task_id)) # type: ignore[attr-defined]

async def get_result(
self,
task_id: str,
with_logs: bool = False,
) -> TaskiqResult[_ReturnType]:
"""
Gets result from the task.
:param task_id: task's id.
:param with_logs: if True it will download task's logs.
:raises ResultIsMissingError: if there is no result when trying to get it.
:return: task's return value.
"""
if self.keep_results:
result_value = await self.redis.get( # type: ignore[attr-defined]
name=task_id,
)
else:
result_value = await self.redis.getdel( # type: ignore[attr-defined]
name=task_id,
)

if result_value is None:
raise ResultIsMissingError

taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
result_value,
)

if not with_logs:
taskiq_result.log = None

return taskiq_result
16 changes: 15 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,18 @@ def redis_url() -> str:
:return: URL string.
"""
return os.environ.get("TEST_REDIS_URL", "redis://localhost")
return os.environ.get("TEST_REDIS_URL", "redis://localhost:7000")


@pytest.fixture
def redis_cluster_url() -> str:
"""
URL to connect to redis cluster.
It tries to get it from environ,
and return default one if the variable is
not set.
:return: URL string.
"""
return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001")
127 changes: 126 additions & 1 deletion tests/test_result_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
from taskiq import TaskiqResult

from taskiq_redis import RedisAsyncResultBackend
from taskiq_redis import RedisAsyncClusterResultBackend, RedisAsyncResultBackend
from taskiq_redis.exceptions import ResultIsMissingError


Expand Down Expand Up @@ -130,3 +130,128 @@ async def test_keep_results_after_reading(redis_url: str) -> None:
res2 = await result_backend.get_result(task_id=task_id)
assert res1 == res2
await result_backend.shutdown()


@pytest.mark.anyio
async def test_set_result_success_cluster(redis_cluster_url: str) -> None:
"""
Tests that results can be set without errors in cluster mode.
:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

fetched_result = await result_backend.get_result(
task_id=task_id,
with_logs=True,
)
assert fetched_result.log == "My Log"
assert fetched_result.return_value == 11
assert fetched_result.execution_time == 112.2
assert fetched_result.is_err
await result_backend.shutdown()


@pytest.mark.anyio
async def test_fetch_without_logs_cluster(redis_cluster_url: str) -> None:
"""
Check if fetching value without logs works fine.
:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

fetched_result = await result_backend.get_result(
task_id=task_id,
with_logs=False,
)
assert fetched_result.log is None
assert fetched_result.return_value == 11
assert fetched_result.execution_time == 112.2
assert fetched_result.is_err
await result_backend.shutdown()


@pytest.mark.anyio
async def test_remove_results_after_reading_cluster(redis_cluster_url: str) -> None:
"""
Check if removing results after reading works fine.
:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
keep_results=False,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

await result_backend.get_result(task_id=task_id)
with pytest.raises(ResultIsMissingError):
await result_backend.get_result(task_id=task_id)

await result_backend.shutdown()


@pytest.mark.anyio
async def test_keep_results_after_reading_cluster(redis_cluster_url: str) -> None:
"""
Check if keeping results after reading works fine.
:param redis_url: redis URL.
"""
result_backend = RedisAsyncClusterResultBackend( # type: ignore
redis_url=redis_cluster_url,
keep_results=True,
)
task_id = uuid.uuid4().hex
result: "TaskiqResult[int]" = TaskiqResult(
is_err=True,
log="My Log",
return_value=11,
execution_time=112.2,
)
await result_backend.set_result(
task_id=task_id,
result=result,
)

res1 = await result_backend.get_result(task_id=task_id)
res2 = await result_backend.get_result(task_id=task_id)
assert res1 == res2
await result_backend.shutdown()

0 comments on commit 7386f46

Please sign in to comment.