Skip to content

Commit

Permalink
Add DB queue store
Browse files Browse the repository at this point in the history
  • Loading branch information
RealOrangeOne committed May 31, 2024
1 parent 08342d6 commit b9b577d
Show file tree
Hide file tree
Showing 17 changed files with 402 additions and 18 deletions.
3 changes: 3 additions & 0 deletions django_tasks/backends/database/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .backend import DatabaseBackend

__all__ = ["DatabaseBackend"]
6 changes: 6 additions & 0 deletions django_tasks/backends/database/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TasksAppConfig(AppConfig):
name = "django_tasks.backends.database"
label = "django_tasks_database"
89 changes: 89 additions & 0 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, TypeVar

from django.core.exceptions import ValidationError
from typing_extensions import ParamSpec

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.task import Task
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import json_normalize

if TYPE_CHECKING:
from .models import DBTaskResult

T = TypeVar("T")
P = ParamSpec("P")


@dataclass
class TaskResult(BaseTaskResult[T]):
db_result: "DBTaskResult"

def refresh(self) -> None:
self.db_result.refresh_from_db()
for attr, value in asdict(self.db_result.get_task_result()).items():
setattr(self, attr, value)

async def arefresh(self) -> None:
await self.db_result.arefresh_from_db()
for attr, value in asdict(self.db_result.get_task_result()).items():
setattr(self, attr, value)


class DatabaseBackend(BaseTaskBackend):
supports_async_task = True
supports_get_result = True

def _task_to_db_task(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> "DBTaskResult":
from .models import DBTaskResult

return DBTaskResult(
args_kwargs=json_normalize({"args": args, "kwargs": kwargs}),
priority=task.priority,
task_path=task.module_path,
queue_name=task.queue_name,
run_after=task.run_after,
backend_name=self.alias,
)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
self.validate_task(task)

db_result = self._task_to_db_task(task, args, kwargs)

db_result.save()

return db_result.get_task_result()

async def aenqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
self.validate_task(task)

db_result = self._task_to_db_task(task, args, kwargs)

await db_result.asave()

return db_result.get_task_result()

def get_result(self, result_id: str) -> TaskResult:
from .models import DBTaskResult

try:
return DBTaskResult.objects.get(id=result_id).get_task_result()
except (DBTaskResult.DoesNotExist, ValidationError) as e:
raise ResultDoesNotExist(result_id) from e

async def aget_result(self, result_id: str) -> TaskResult:
from .models import DBTaskResult

try:
return (await DBTaskResult.objects.aget(id=result_id)).get_task_result()
except (DBTaskResult.DoesNotExist, ValidationError) as e:
raise ResultDoesNotExist(result_id) from e
48 changes: 48 additions & 0 deletions django_tasks/backends/database/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Generated by Django 4.2.13 on 2024-05-24 10:46

import uuid

from django.db import migrations, models


class Migration(migrations.Migration):
initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="DBTaskResult",
fields=[
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"status",
models.CharField(
choices=[
("NEW", "New"),
("RUNNING", "Running"),
("FAILED", "Failed"),
("COMPLETE", "Complete"),
],
default="NEW",
max_length=8,
),
),
("args_kwargs", models.JSONField()),
("priority", models.PositiveSmallIntegerField(null=True)),
("task_path", models.TextField()),
("queue_name", models.TextField()),
("backend_name", models.TextField()),
("run_after", models.DateTimeField(null=True)),
("result", models.JSONField(default=None, null=True)),
],
),
]
Empty file.
64 changes: 64 additions & 0 deletions django_tasks/backends/database/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import uuid
from typing import TYPE_CHECKING, Any

from django.db import models
from django.utils.functional import cached_property
from django.utils.module_loading import import_string

from django_tasks.task import ResultStatus, Task

if TYPE_CHECKING:
from .backend import TaskResult


class DBTaskResult(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

status = models.CharField(
choices=ResultStatus.choices,
default=ResultStatus.NEW,
max_length=max(len(value) for value in ResultStatus.values),
)

args_kwargs = models.JSONField()

priority = models.PositiveSmallIntegerField(null=True)

task_path = models.TextField()

queue_name = models.TextField()
backend_name = models.TextField()

run_after = models.DateTimeField(null=True)

result = models.JSONField(default=None, null=True)

@cached_property
def task(self) -> Task:
task = import_string(self.task_path)

assert isinstance(task, Task)

return task.using(
priority=self.priority,
queue_name=self.queue_name,
run_after=self.run_after,
backend=self.backend_name,
)

def get_task_result(self) -> "TaskResult":
from .backend import TaskResult

result = TaskResult[Any](
db_result=self,
task=self.task,
id=str(self.id),
status=ResultStatus[self.status],
args=self.args_kwargs["args"],
kwargs=self.args_kwargs["kwargs"],
backend=self.backend_name,
)

result._result = self.result

return result
5 changes: 3 additions & 2 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import json_normalize

from .base import BaseTaskBackend

Expand All @@ -32,8 +33,8 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
args=args,
kwargs=kwargs,
args=json_normalize(args),
kwargs=json_normalize(kwargs),
backend=self.alias,
)

Expand Down
5 changes: 3 additions & 2 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import ParamSpec

from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import json_normalize

from .base import BaseTaskBackend

Expand Down Expand Up @@ -36,8 +37,8 @@ def enqueue(
task=task,
id=str(uuid4()),
status=status,
args=args,
kwargs=kwargs,
args=json_normalize(args),
kwargs=json_normalize(kwargs),
backend=self.alias,
)

Expand Down
12 changes: 8 additions & 4 deletions django_tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@


class ResultStatus(TextChoices):
NEW = "NEW"
RUNNING = "RUNNING"
FAILED = "FAILED"
COMPLETE = "COMPLETE"
NEW = ("NEW", "New")
RUNNING = ("RUNNING", "Running")
FAILED = ("FAILED", "Failed")
COMPLETE = ("COMPLETE", "Complete")


T = TypeVar("T")
Expand Down Expand Up @@ -149,6 +149,10 @@ def get_backend(self) -> "BaseTaskBackend":

return tasks[self.backend]

@property
def module_path(self) -> str:
return f"{self.func.__module__}.{self.func.__qualname__}"


def task(
priority: Optional[int] = None,
Expand Down
7 changes: 7 additions & 0 deletions django_tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ def is_json_serializable(obj: Any) -> bool:
return True
except (TypeError, OverflowError):
return False


def json_normalize(obj: Any) -> Any:
"""
Round-trip encode object as JSON to normalize types.
"""
return json.loads(json.dumps(obj))
Empty file modified manage.py
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

INSTALLED_APPS = [
"django_tasks",
"django_tasks.backends.database",
"tests",
]

Expand Down
Loading

0 comments on commit b9b577d

Please sign in to comment.