diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index 23892f3..360b9b1 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -156,7 +156,7 @@ async def kick(self, message: BrokerMessage) -> None: """ target_task = self.find_task(message.task_name) if target_task is None: - raise TaskiqError("Unknown task.") + raise TaskiqError(description="Unknown task.") receiver_cb = self.receiver.callback(message=message.message) if self.await_inplace: diff --git a/taskiq/brokers/shared_broker.py b/taskiq/brokers/shared_broker.py index 6dc4213..d3e96d2 100644 --- a/taskiq/brokers/shared_broker.py +++ b/taskiq/brokers/shared_broker.py @@ -57,8 +57,10 @@ async def kick(self, message: BrokerMessage) -> None: :raises TaskiqError: if called. """ raise TaskiqError( - "You cannot use kiq directly on shared task " - "without setting the default_broker.", + description=( + "You cannot use kiq directly on shared task " + "without setting the default_broker." + ), ) async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore @@ -69,7 +71,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore :raises TaskiqError: if called. """ - raise TaskiqError("Shared broker cannot listen") + raise TaskiqError(description="Shared broker cannot listen") def _register_task( self, diff --git a/taskiq/exceptions.py b/taskiq/exceptions.py index e9b205a..fd925a7 100644 --- a/taskiq/exceptions.py +++ b/taskiq/exceptions.py @@ -4,7 +4,8 @@ class TaskiqError(root.Error): """Base exception for all errors.""" - __template__ = "Base exception for all errors" + __template__ = "Exception occurred: {description}" + description: str class TaskiqResultTimeoutError(TaskiqError): diff --git a/taskiq/funcs.py b/taskiq/funcs.py index 346e231..4e49e81 100644 --- a/taskiq/funcs.py +++ b/taskiq/funcs.py @@ -47,7 +47,7 @@ async def check_task(task: AsyncTaskiqTask[Any]) -> None: while task_ids: if 0 < timeout < time() - start_time: - raise TaskiqResultTimeoutError("Timed out", timeout=timeout) + raise TaskiqResultTimeoutError(timeout=timeout) check_tasks = [] for task in tasks: check_tasks.append(loop.create_task(check_task(task))) diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index afa5d4c..41e687a 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -353,7 +353,9 @@ async def prefetcher( """ fetched_tasks: int = 0 iterator = self.broker.listen() - current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task( + current_message: asyncio.Task[ + Union[bytes, AckableMessage] + ] = asyncio.create_task( iterator.__anext__(), # type: ignore ) diff --git a/taskiq/task.py b/taskiq/task.py index a3f0eab..a66b2d8 100644 --- a/taskiq/task.py +++ b/taskiq/task.py @@ -151,7 +151,7 @@ async def wait_result( while not await self.is_ready(): await asyncio.sleep(check_interval) if 0 < timeout < time() - start_time: - raise TaskiqResultTimeoutError("Timed out", timeout=timeout) + raise TaskiqResultTimeoutError(timeout=timeout) return await self.get_result(with_logs=with_logs) async def get_progress(self) -> "Optional[TaskProgress[Any]]":