diff --git a/mula/logging.json b/mula/logging.json index 2d7a02642d9..6fd1ed1ce2a 100644 --- a/mula/logging.json +++ b/mula/logging.json @@ -9,13 +9,13 @@ "handlers": { "console": { "class": "logging.StreamHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "default", "stream": "ext://sys.stdout" } }, "root": { - "level": "INFO", + "level": "DEBUG", "handlers": [ "console" ] diff --git a/mula/scheduler/server/handlers/health.py b/mula/scheduler/server/handlers/health.py index 1cf8f4b1776..bd2f110a2ad 100644 --- a/mula/scheduler/server/handlers/health.py +++ b/mula/scheduler/server/handlers/health.py @@ -1,5 +1,3 @@ -from typing import Any - import fastapi import structlog from fastapi import status @@ -22,7 +20,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext) -> None: description="Health check endpoint", ) - def health(self, externals: bool = False) -> Any: + def health(self, externals: bool = False) -> models.ServiceHealth: response = models.ServiceHealth(service="scheduler", healthy=True, version=version.__version__) if externals: diff --git a/mula/scheduler/server/handlers/queues.py b/mula/scheduler/server/handlers/queues.py index 6507a15a2cc..494b1cb5f9c 100644 --- a/mula/scheduler/server/handlers/queues.py +++ b/mula/scheduler/server/handlers/queues.py @@ -5,8 +5,8 @@ from fastapi import status from scheduler import context, models, queues, schedulers, storage -from scheduler.server import serializers from scheduler.server.errors import BadRequestError, ConflictError, NotFoundError, TooManyRequestsError +from scheduler.server.models import Queue, Task, TaskPush class QueueAPI: @@ -20,7 +20,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/queues", endpoint=self.list, methods=["GET"], - response_model=list[models.Queue], + response_model=list[Queue], response_model_exclude_unset=True, status_code=status.HTTP_200_OK, description="List all queues", @@ -30,7 +30,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/queues/{queue_id}", endpoint=self.get, methods=["GET"], - response_model=models.Queue, + response_model=Queue, status_code=status.HTTP_200_OK, description="Get a queue", ) @@ -39,7 +39,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/queues/{queue_id}/pop", endpoint=self.pop, methods=["POST"], - response_model=models.Task | None, + response_model=Task | None, status_code=status.HTTP_200_OK, description="Pop an item from a queue", ) @@ -48,22 +48,22 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/queues/{queue_id}/push", endpoint=self.push, methods=["POST"], - response_model=models.Task | None, + response_model=Task | None, status_code=status.HTTP_201_CREATED, description="Push an item to a queue", ) def list(self) -> Any: - return [models.Queue(**s.queue.dict(include_pq=False)) for s in self.schedulers.copy().values()] + return [Queue(**s.queue.dict(include_pq=False)) for s in self.schedulers.copy().values()] - def get(self, queue_id: str) -> Any: + def get(self, queue_id: str) -> Queue: s = self.schedulers.get(queue_id) if s is None: raise NotFoundError(f"queue not found, by queue_id: {queue_id}") - return models.Queue(**s.queue.dict()) + return Queue(**s.queue.dict()) - def pop(self, queue_id: str, filters: storage.filters.FilterRequest | None = None) -> Any: + def pop(self, queue_id: str, filters: storage.filters.FilterRequest | None = None) -> Task | None: s = self.schedulers.get(queue_id) if s is None: raise NotFoundError(f"queue not found, by queue_id: {queue_id}") @@ -76,15 +76,15 @@ def pop(self, queue_id: str, filters: storage.filters.FilterRequest | None = Non if item is None: raise NotFoundError("could not pop item from queue, check your filters") - return models.Task(**item.model_dump()) + return Task(**item.model_dump()) - def push(self, queue_id: str, item_in: serializers.Task) -> Any: + def push(self, queue_id: str, item: TaskPush) -> Task | None: s = self.schedulers.get(queue_id) if s is None: raise NotFoundError(f"queue not found, by queue_id: {queue_id}") # Load default values - new_item = models.Task(**item_in.model_dump(exclude_unset=True)) + new_item = models.Task(**item.model_dump(exclude_unset=True)) # Set values if new_item.scheduler_id is None: @@ -99,4 +99,4 @@ def push(self, queue_id: str, item_in: serializers.Task) -> Any: except queues.errors.NotAllowedError: raise ConflictError("queue is not allowed to push items") - return pushed_item + return Task(**pushed_item.model_dump()) diff --git a/mula/scheduler/server/handlers/schedulers.py b/mula/scheduler/server/handlers/schedulers.py index 9358dcec45a..42a79f20340 100644 --- a/mula/scheduler/server/handlers/schedulers.py +++ b/mula/scheduler/server/handlers/schedulers.py @@ -1,11 +1,10 @@ -from typing import Any - import fastapi import structlog from fastapi import status from scheduler import context, models, schedulers from scheduler.server.errors import BadRequestError, NotFoundError +from scheduler.server.models import Scheduler class SchedulerAPI: @@ -19,7 +18,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/schedulers", endpoint=self.list, methods=["GET"], - response_model=list[models.Scheduler], + response_model=list[Scheduler], status_code=status.HTTP_200_OK, description="List all schedulers", ) @@ -28,7 +27,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/schedulers/{scheduler_id}", endpoint=self.get, methods=["GET"], - response_model=models.Scheduler, + response_model=Scheduler, status_code=status.HTTP_200_OK, description="Get a scheduler", ) @@ -37,22 +36,22 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s path="/schedulers/{scheduler_id}", endpoint=self.patch, methods=["PATCH"], - response_model=models.Scheduler, + response_model=Scheduler, status_code=status.HTTP_200_OK, description="Update a scheduler", ) - def list(self) -> Any: - return [models.Scheduler(**s.dict()) for s in self.schedulers.values()] + def list(self) -> list[Scheduler]: + return [Scheduler(**s.dict()) for s in self.schedulers.values()] - def get(self, scheduler_id: str) -> Any: + def get(self, scheduler_id: str) -> Scheduler: s = self.schedulers.get(scheduler_id) if s is None: raise NotFoundError(f"Scheduler {scheduler_id} not found") - return models.Scheduler(**s.dict()) + return Scheduler(**s.dict()) - def patch(self, scheduler_id: str, item: models.Scheduler) -> Any: + def patch(self, scheduler_id: str, item: models.Scheduler) -> Scheduler: s = self.schedulers.get(scheduler_id) if s is None: raise NotFoundError(f"Scheduler {scheduler_id} not found") @@ -75,4 +74,4 @@ def patch(self, scheduler_id: str, item: models.Scheduler) -> Any: elif not updated_scheduler.enabled: s.disable() - return updated_scheduler + return Scheduler(**updated_scheduler.dict()) diff --git a/mula/scheduler/server/handlers/schedules.py b/mula/scheduler/server/handlers/schedules.py index 3f4a9070466..5ea809a6317 100644 --- a/mula/scheduler/server/handlers/schedules.py +++ b/mula/scheduler/server/handlers/schedules.py @@ -1,14 +1,14 @@ import datetime import uuid -from typing import Any import fastapi import structlog from fastapi import Body from scheduler import context, models, schedulers, storage -from scheduler.server import serializers, utils +from scheduler.server import utils from scheduler.server.errors import BadRequestError, ConflictError, NotFoundError, ValidationError +from scheduler.server.models import Schedule, ScheduleCreate, ScheduleUpdate class ScheduleAPI: @@ -33,7 +33,7 @@ def __init__( path="/schedules", endpoint=self.create, methods=["POST"], - response_model=models.Schedule, + response_model=Schedule, status_code=201, description="Create a schedule", ) @@ -42,7 +42,7 @@ def __init__( path="/schedules/{schedule_id}", endpoint=self.get, methods=["GET"], - response_model=models.Schedule, + response_model=Schedule, status_code=200, description="Get a schedule", ) @@ -51,7 +51,7 @@ def __init__( path="/schedules/{schedule_id}", endpoint=self.patch, methods=["PATCH"], - response_model=models.Schedule, + response_model=Schedule, response_model_exclude_unset=True, status_code=200, description="Update a schedule", @@ -86,7 +86,7 @@ def list( max_deadline_at: datetime.datetime | None = None, min_created_at: datetime.datetime | None = None, max_created_at: datetime.datetime | None = None, - ) -> Any: + ) -> utils.PaginatedResponse: if (min_created_at is not None and max_created_at is not None) and min_created_at > max_created_at: raise BadRequestError("min_created_at must be less than max_created_at") @@ -104,10 +104,11 @@ def list( offset=offset, limit=limit, ) + results = [Schedule(**s.model_dump()) for s in results] return utils.paginate(request, results, count, offset, limit) - def create(self, schedule: serializers.ScheduleCreate) -> Any: + def create(self, schedule: ScheduleCreate) -> Schedule: try: new_schedule = models.Schedule(**schedule.model_dump()) except ValueError: @@ -131,17 +132,17 @@ def create(self, schedule: serializers.ScheduleCreate) -> Any: if schedule is not None: raise ConflictError(f"schedule with the same hash already exists: {new_schedule.hash}") - self.ctx.datastores.schedule_store.create_schedule(new_schedule) - return new_schedule + created_schedule = self.ctx.datastores.schedule_store.create_schedule(new_schedule) + return Schedule(**created_schedule.model_dump()) - def get(self, schedule_id: uuid.UUID) -> Any: + def get(self, schedule_id: uuid.UUID) -> Schedule: schedule = self.ctx.datastores.schedule_store.get_schedule(schedule_id) if schedule is None: raise NotFoundError(f"schedule not found, by schedule_id: {schedule_id}") - return schedule + return Schedule(**schedule.model_dump()) - def patch(self, schedule_id: uuid.UUID, schedule: serializers.SchedulePatch) -> Any: + def patch(self, schedule_id: uuid.UUID, schedule: ScheduleUpdate) -> Schedule: schedule_db = self.ctx.datastores.schedule_store.get_schedule(schedule_id) if schedule_db is None: raise NotFoundError(f"schedule not found, by schedule_id: {schedule_id}") @@ -162,7 +163,7 @@ def patch(self, schedule_id: uuid.UUID, schedule: serializers.SchedulePatch) -> # Update schedule in database self.ctx.datastores.schedule_store.update_schedule(updated_schedule) - return updated_schedule + return Schedule(**updated_schedule.model_dump()) def search( self, @@ -180,6 +181,7 @@ def search( results, count = self.ctx.datastores.schedule_store.get_schedules( offset=offset, limit=limit, filters=filters ) + results = [Schedule(**s.model_dump()) for s in results] except storage.filters.errors.FilterError as exc: raise fastapi.HTTPException( status_code=fastapi.status.HTTP_400_BAD_REQUEST, detail=f"invalid filter(s) [exception: {exc}]" diff --git a/mula/scheduler/server/handlers/tasks.py b/mula/scheduler/server/handlers/tasks.py index 46b6cc7469a..0f9cd4948af 100644 --- a/mula/scheduler/server/handlers/tasks.py +++ b/mula/scheduler/server/handlers/tasks.py @@ -1,14 +1,14 @@ import datetime import uuid -from typing import Any import fastapi import structlog from fastapi import status -from scheduler import context, models, storage -from scheduler.server import serializers, utils +from scheduler import context, storage +from scheduler.server import utils from scheduler.server.errors import BadRequestError, NotFoundError +from scheduler.server.models import Task, TaskUpdate class TaskAPI: @@ -46,7 +46,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext) -> None: path="/tasks/{task_id}", endpoint=self.get, methods=["GET"], - response_model=models.Task, + response_model=Task, status_code=status.HTTP_200_OK, description="Get a task", ) @@ -55,7 +55,7 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext) -> None: path="/tasks/{task_id}", endpoint=self.patch, methods=["PATCH"], - response_model=models.Task, + response_model=TaskUpdate, response_model_exclude_unset=True, status_code=status.HTTP_200_OK, description="Update a task", @@ -74,7 +74,7 @@ def list( input_ooi: str | None = None, # FIXME: deprecated plugin_id: str | None = None, # FIXME: deprecated filters: storage.filters.FilterRequest | None = None, - ) -> Any: + ) -> utils.PaginatedResponse: if (min_created_at is not None and max_created_at is not None) and min_created_at > max_created_at: raise BadRequestError("min_created_at must be less than max_created_at") @@ -137,16 +137,16 @@ def list( max_created_at=max_created_at, filters=f_req, ) - + results = [Task(**t.model_dump()) for t in results] return utils.paginate(request, results, count, offset, limit) - def get(self, task_id: uuid.UUID) -> Any: + def get(self, task_id: uuid.UUID) -> Task: task = self.ctx.datastores.task_store.get_task(task_id) if task is None: raise NotFoundError(f"task not found, by task_id: {task_id}") - return task + return Task(**task.model_dump()) - def patch(self, task_id: uuid.UUID, item: serializers.Task) -> Any: + def patch(self, task_id: uuid.UUID, item: TaskUpdate) -> TaskUpdate: task_db = self.ctx.datastores.task_store.get_task(task_id) if task_db is None: @@ -161,7 +161,7 @@ def patch(self, task_id: uuid.UUID, item: serializers.Task) -> Any: self.ctx.datastores.task_store.update_task(updated_task) - return updated_task + return TaskUpdate(**updated_task.model_dump()) def stats(self, scheduler_id: str | None = None) -> dict[str, dict[str, int]] | None: return self.ctx.datastores.task_store.get_status_count_per_hour(scheduler_id) diff --git a/mula/scheduler/server/models/__init__.py b/mula/scheduler/server/models/__init__.py new file mode 100644 index 00000000000..e97001d67c3 --- /dev/null +++ b/mula/scheduler/server/models/__init__.py @@ -0,0 +1,4 @@ +from .queue import Queue, TaskPush +from .schedule import Schedule, ScheduleCreate, ScheduleUpdate +from .scheduler import Scheduler +from .task import Task, TaskStatus, TaskUpdate diff --git a/mula/scheduler/server/models/queue.py b/mula/scheduler/server/models/queue.py new file mode 100644 index 00000000000..6194f26a25f --- /dev/null +++ b/mula/scheduler/server/models/queue.py @@ -0,0 +1,30 @@ +import uuid +from datetime import datetime + +from pydantic import BaseModel + +from scheduler.models import Task, TaskStatus + + +class Queue(BaseModel): + id: str + size: int + maxsize: int + item_type: str + allow_replace: bool + allow_updates: bool + allow_priority_updates: bool + pq: list[Task] | None = None + + +class TaskPush(BaseModel): + id: uuid.UUID | None = None + scheduler_id: str | None = None + schedule_id: uuid.UUID | None = None + priority: int | None = None + status: TaskStatus | None = None + type: str | None = None + hash: str | None = None + data: dict | None = None + created_at: datetime | None = None + modified_at: datetime | None = None diff --git a/mula/scheduler/server/serializers/schedule.py b/mula/scheduler/server/models/schedule.py similarity index 52% rename from mula/scheduler/server/serializers/schedule.py rename to mula/scheduler/server/models/schedule.py index acbd1d7ab05..45c5b3b4660 100644 --- a/mula/scheduler/server/serializers/schedule.py +++ b/mula/scheduler/server/models/schedule.py @@ -1,30 +1,31 @@ +import uuid from datetime import datetime -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, Field -class ScheduleCreate(BaseModel): - model_config = ConfigDict(from_attributes=True) - +class Schedule(BaseModel): + id: uuid.UUID scheduler_id: str + hash: str + data: dict | None + enabled: bool + schedule: str | None + deadline_at: datetime | None + created_at: datetime + modified_at: datetime - data: dict +class ScheduleCreate(BaseModel): + scheduler_id: str + data: dict schedule: str - deadline_at: datetime | None = None -# NOTE: model added for support of partial updates -class SchedulePatch(BaseModel): - model_config = ConfigDict(from_attributes=True) - +class ScheduleUpdate(BaseModel): hash: str | None = Field(None, max_length=32) - data: dict | None = None - enabled: bool | None = None - schedule: str | None = None - deadline_at: datetime | None = None diff --git a/mula/scheduler/server/models/scheduler.py b/mula/scheduler/server/models/scheduler.py new file mode 100644 index 00000000000..873e37d1aba --- /dev/null +++ b/mula/scheduler/server/models/scheduler.py @@ -0,0 +1,11 @@ +from datetime import datetime +from typing import Any + +from pydantic import BaseModel + + +class Scheduler(BaseModel): + id: str + enabled: bool | None = None + priority_queue: dict[str, Any] | None = None + last_activity: datetime | None = None diff --git a/mula/scheduler/server/models/task.py b/mula/scheduler/server/models/task.py new file mode 100644 index 00000000000..6f47745db2d --- /dev/null +++ b/mula/scheduler/server/models/task.py @@ -0,0 +1,32 @@ +import uuid +from datetime import datetime + +from pydantic import BaseModel + +from scheduler.models import TaskStatus + + +class Task(BaseModel): + id: uuid.UUID + scheduler_id: str + schedule_id: uuid.UUID | None + priority: int | None + status: TaskStatus | None + type: str + hash: str | None + data: dict | None + created_at: datetime + modified_at: datetime + + +class TaskUpdate(BaseModel): + id: uuid.UUID | None = None + scheduler_id: str | None = None + schedule_id: uuid.UUID | None = None + priority: int | None = None + status: TaskStatus | None = None + type: str | None = None + hash: str | None = None + data: dict | None = None + created_at: datetime | None = None + modified_at: datetime | None = None diff --git a/mula/scheduler/server/serializers/__init__.py b/mula/scheduler/server/serializers/__init__.py deleted file mode 100644 index a4d3c0b20c4..00000000000 --- a/mula/scheduler/server/serializers/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .schedule import ScheduleCreate, SchedulePatch -from .task import Task, TaskStatus diff --git a/mula/scheduler/server/serializers/task.py b/mula/scheduler/server/serializers/task.py deleted file mode 100644 index 3a4e6fc3846..00000000000 --- a/mula/scheduler/server/serializers/task.py +++ /dev/null @@ -1,54 +0,0 @@ -import enum -import uuid -from datetime import datetime - -from pydantic import BaseModel, ConfigDict - - -class TaskStatus(str, enum.Enum): - # Task has been created but not yet queued - PENDING = "pending" - - # Task has been pushed onto queue and is ready to be picked up - QUEUED = "queued" - - # Task has been picked up by a worker - DISPATCHED = "dispatched" - - # Task has been picked up by a worker, and the worker indicates that it is - # running. - RUNNING = "running" - - # Task has been completed - COMPLETED = "completed" - - # Task has failed - FAILED = "failed" - - # Task has been cancelled - CANCELLED = "cancelled" - - -# NOTE: model added for support of partial updates -class Task(BaseModel): - model_config = ConfigDict(from_attributes=True, use_enum_values=True) - - id: uuid.UUID | None = None - - scheduler_id: str | None = None - - schedule_id: uuid.UUID | None = None - - priority: int | None = None - - status: TaskStatus | None = None - - type: str | None = None - - hash: str | None = None - - data: dict | None = None - - created_at: datetime | None = None - - modified_at: datetime | None = None diff --git a/mula/tests/integration/test_api.py b/mula/tests/integration/test_api.py index fa689eb1aee..bdcfdd888f2 100644 --- a/mula/tests/integration/test_api.py +++ b/mula/tests/integration/test_api.py @@ -8,7 +8,7 @@ from fastapi.testclient import TestClient from scheduler import config, models, server, storage, utils -from scheduler.server import serializers +from scheduler.server.models import Task from tests.factories import OrganisationFactory from tests.mocks import queue as mock_queue @@ -257,7 +257,7 @@ def test_push_updates_not_allowed(self): self.assertEqual(1, self.scheduler.queue.qsize()) # Update the item - updated_item = serializers.Task(**response.json()) + updated_item = Task(**response.json()) updated_item.data["name"] = "updated-name" # Try to update the item through the api @@ -279,7 +279,7 @@ def test_push_updates_allowed(self): self.assertEqual(1, self.scheduler.queue.qsize()) # Update the item - updated_item = serializers.Task(**response.json()) + updated_item = Task(**response.json()) updated_item.data["name"] = "updated-name" # Try to update the item through the api @@ -306,7 +306,7 @@ def test_push_priority_updates_not_allowed(self): self.assertEqual(1, self.scheduler.queue.qsize()) # Update the item - updated_item = serializers.Task(**response.json()) + updated_item = Task(**response.json()) updated_item.priority = 2 # Try to update the item through the api @@ -332,7 +332,7 @@ def test_update_priority_higher(self): self.assertEqual(response.status_code, 201) # Update priority of the item - updated_item = serializers.Task(**response.json()) + updated_item = Task(**response.json()) updated_item.priority = 1 # Try to update the item through the api @@ -360,7 +360,7 @@ def test_update_priority_lower(self): self.assertEqual(response.status_code, 201) # Update priority of the item - updated_item = serializers.Task(**response.json()) + updated_item = Task(**response.json()) updated_item.priority = 2 # Try to update the item through the api diff --git a/mula/tests/unit/test_utils.py b/mula/tests/unit/test_utils.py index b10c049b495..f0f9f9d4600 100644 --- a/mula/tests/unit/test_utils.py +++ b/mula/tests/unit/test_utils.py @@ -32,3 +32,9 @@ def test_toggle_expire(self): with self.assertRaises(utils.ExpiredError): ed.get("a") + + def test_key_error(self): + ed = utils.ExpiringDict() + + with self.assertRaises(KeyError): + ed["a"]