diff --git a/pixl_core/src/core/patient_queue/subscriber.py b/pixl_core/src/core/patient_queue/subscriber.py index bc868ee6b..03805ef8c 100644 --- a/pixl_core/src/core/patient_queue/subscriber.py +++ b/pixl_core/src/core/patient_queue/subscriber.py @@ -26,7 +26,7 @@ from ._base import PixlBlockingInterface, PixlQueueInterface -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) class PixlConsumer(PixlQueueInterface): @@ -61,25 +61,24 @@ async def run(self, callback: Callable[[Message], Awaitable[None]]) -> None: be processed. Must take a dictionary and return None. """ async with self._queue.iterator() as queue_iter: + # Pre-annotate the message type + message: aio_pika.abc.AbstractIncomingMessage async for message in queue_iter: if not self.token_bucket.has_token: await asyncio.sleep(0.01) await message.reject(requeue=True) continue - # Messages need to be acknowledged before a callback otherwise - # RabbitMQ can close the connection. As all messages that raise - # exceptions aren't returned to the queue we're safe to do this - await message.ack() - try: await asyncio.sleep(0.01) # Avoid very fast callbacks await callback(deserialise(message.body)) except Exception: - LOGGER.exception( + logger.exception( "Failed to process %s" "Not re-queuing message", message.body.decode(), ) + finally: + await message.ack() async def __aexit__(self, *args: object, **kwargs: Any) -> None: """Requirement for the asynchronous context manager""" @@ -104,16 +103,17 @@ def consume_all(self, file_path: Path, timeout_in_seconds: int = 5) -> int: ) def callback(method: Any, properties: Any, body: Any) -> None: # noqa: ARG001 + """Consume to file.""" try: with Path.open(file_path, "a") as csv_file: print(str(body.decode()), file=csv_file) except: # noqa: E722 - LOGGER.debug("Failed to consume") + logger.exception("Failed to consume") counter = 0 for args in generator: if all(arg is None for arg in args): - LOGGER.info("Stopping") + logger.info("Stopping") break callback(*args) counter += 1