From 925b5de3690e617832fcc915d43bba0d28c93b89 Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate Date: Thu, 24 Oct 2024 09:37:16 -0600 Subject: [PATCH 1/4] pass Conn.connection_parameters, not just raw NamedTuple, to pika.BlockingConnection --- python/idsse_common/idsse/common/rabbitmq_utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index d895a3d..88eae3a 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -138,10 +138,10 @@ def _initialize_connection_and_channel( # connection of unsupported type passed raise ValueError( (f'Cannot use or create new RabbitMQ connection using type {type(connection)}. ' - 'Should a Conn (a dict with connection parameters)') + 'Should be type Conn (a dict with connection parameters)') ) - _connection = BlockingConnection(parameters=connection) + _connection = BlockingConnection(connection.connection_parameters) logger.info('Established new RabbitMQ connection to %s on port %i', connection.host, connection.port) @@ -352,7 +352,7 @@ class Consumer(Thread): """ def __init__( self, - conn_params: ConnectionParameters, + conn_params: Conn, rmq_params_and_callbacks: RabbitMqParamsAndCallback | list[RabbitMqParamsAndCallback], num_message_handlers: int, *args, @@ -366,7 +366,7 @@ def __init__( self._rmq_params_and_callbacks = rmq_params_and_callbacks else: self._rmq_params_and_callbacks = [rmq_params_and_callbacks] - self.connection = BlockingConnection(parameters=self._conn_params) + self.connection = BlockingConnection(self._conn_params.connection_parameters) self.channel = self.connection.channel() self._consumer_tags = [] @@ -420,7 +420,7 @@ class Publisher(Thread): """ def __init__( self, - conn_params: ConnectionParameters, + conn_params: Conn, exch_params: Exch, *args, **kwargs, @@ -431,7 +431,7 @@ def __init__( self._exch = exch_params self._queue = None - self.connection = BlockingConnection(conn_params) + self.connection = BlockingConnection(conn_params.connection_parameters) self.channel = self.connection.channel() # if delivery is mandatory there must be a queue attach to the exchange From e3150b1486335c6fc1ae534e32ff8a4ea2d15950 Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate Date: Thu, 24 Oct 2024 09:56:39 -0600 Subject: [PATCH 2/4] silence linter warnings about Publisher args for now --- python/idsse_common/idsse/common/rabbitmq_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 88eae3a..8ca1280 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -506,6 +506,7 @@ def stop(self): self.channel.close, self.connection.close) + # pylint: disable=too-many-arguments,too-many-positional-arguments,unused-argument def _publish( self, message: bytes, From c07818d6aa2610e469721581dc668f2dc6ce69a9 Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate Date: Thu, 24 Oct 2024 09:58:38 -0600 Subject: [PATCH 3/4] remove pylint disable command that is too new for our pipeline --- python/idsse_common/idsse/common/rabbitmq_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 8ca1280..ae7c190 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -506,7 +506,7 @@ def stop(self): self.channel.close, self.connection.close) - # pylint: disable=too-many-arguments,too-many-positional-arguments,unused-argument + # pylint: disable=too-many-arguments,unused-argument def _publish( self, message: bytes, From dfeb46b3eb401ad1849934b8f3306e78f3d5deea Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate Date: Thu, 24 Oct 2024 10:00:35 -0600 Subject: [PATCH 4/4] remove print() call from Publisher --- python/idsse_common/idsse/common/rabbitmq_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index ae7c190..2abc434 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -536,7 +536,6 @@ def _publish( mandatory=self._exch.mandatory) if success_flag: success_flag[0] = True - print('\n message published\n') if self._queue and self._queue.name.startswith('_'): try: self.channel.queue_purge(queue=self._queue.name)