Skip to content

Commit

Permalink
Added new base class for exceptions, added templates
Browse files Browse the repository at this point in the history
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
  • Loading branch information
chandr-andr committed Dec 22, 2024
1 parent ad114ce commit f4c03bc
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def kick(self, message: BrokerMessage) -> None:
"""
target_task = self.find_task(message.task_name)
if target_task is None:
raise TaskiqError("Unknown task.")
raise TaskiqError(description="Unknown task.")

receiver_cb = self.receiver.callback(message=message.message)
if self.await_inplace:
Expand Down
8 changes: 5 additions & 3 deletions taskiq/brokers/shared_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ async def kick(self, message: BrokerMessage) -> None:
:raises TaskiqError: if called.
"""
raise TaskiqError(
"You cannot use kiq directly on shared task "
"without setting the default_broker.",
description=(
"You cannot use kiq directly on shared task "
"without setting the default_broker."
),
)

async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
Expand All @@ -69,7 +71,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
:raises TaskiqError: if called.
"""
raise TaskiqError("Shared broker cannot listen")
raise TaskiqError(description="Shared broker cannot listen")

def _register_task(
self,
Expand Down
3 changes: 2 additions & 1 deletion taskiq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
class TaskiqError(root.Error):
"""Base exception for all errors."""

__template__ = "Base exception for all errors"
__template__ = "Exception occurred: {description}"
description: str


class TaskiqResultTimeoutError(TaskiqError):
Expand Down
2 changes: 1 addition & 1 deletion taskiq/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def check_task(task: AsyncTaskiqTask[Any]) -> None:

while task_ids:
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError("Timed out", timeout=timeout)
raise TaskiqResultTimeoutError(timeout=timeout)
check_tasks = []
for task in tasks:
check_tasks.append(loop.create_task(check_task(task)))
Expand Down
4 changes: 3 additions & 1 deletion taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ async def prefetcher(
"""
fetched_tasks: int = 0
iterator = self.broker.listen()
current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task(
current_message: asyncio.Task[
Union[bytes, AckableMessage]
] = asyncio.create_task(
iterator.__anext__(), # type: ignore
)

Expand Down
2 changes: 1 addition & 1 deletion taskiq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def wait_result(
while not await self.is_ready():
await asyncio.sleep(check_interval)
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError("Timed out", timeout=timeout)
raise TaskiqResultTimeoutError(timeout=timeout)
return await self.get_result(with_logs=with_logs)

async def get_progress(self) -> "Optional[TaskProgress[Any]]":
Expand Down

0 comments on commit f4c03bc

Please sign in to comment.