diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index c9697135..66074b66 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -22,13 +22,12 @@ from threading import Thread, Event from typing import Optional, Dict, NamedTuple, Union, Callable, cast -from pika import SelectConnection, ConnectionParameters, PlainCredentials, BasicProperties +from pika import SelectConnection, BasicProperties from pika.channel import Channel from pika.frame import Method from pika.spec import Basic from idsse.common.rabbitmq_utils import Conn, Exch, Queue -from idsse.common.log_util import set_corr_id_context_var logger = logging.getLogger(__name__) @@ -65,13 +64,17 @@ class PublishConfirm: socket timeouts. """ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): - """Setup the example publisher object, passing in the URL we will use - to connect to RabbitMQ. - :param Conn conn: The RabbitMQ connection detail object - :param Exch exchange: The RabbitMQ exchange details - :param Queue queue: The RabbitMQ queue details + """Setup the example publisher object, passing in the RabbitMqUtils we will use to + connect to RabbitMQ. + + Args: + conn (Conn): The RabbitMQ connection detail object + exchange (Exch): The RabbitMQ exchange details. + queue (Queue): The RabbitMQ queue details. If name starts with '_', will be setup as + a "private queue", i.e. not intended for consumers, and all published messages + will have a 10-second TTL. """ - self._thread = Thread(name=f'{__name__}-{randint(0,9)}', + self._thread = Thread(name=f'PublishConfirm-{randint(0,9)}', daemon=True, target=self._run) @@ -84,8 +87,6 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): self._records = PublishConfirmRecords() # data class to track message activity self._on_ready_callback: Optional[Callable[[], None]] = None - set_corr_id_context_var('PublishConfirm') - def publish_message(self, message: Dict, routing_key = '', @@ -114,6 +115,9 @@ def publish_message(self, properties = BasicProperties(content_type='application/json', content_encoding='utf-8', correlation_id=corr_id) + + logger.info('Publishing message to queue %s, message length: %d', + self._rmq_params.queue.name, len(json.dumps(message))) self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, json.dumps(message, ensure_ascii=True), properties) @@ -131,23 +135,16 @@ def publish_message(self, def start(self): """Start thread to connect to RabbitMQ queue and prepare to publish messages, invoking callback when setup complete. - """ - logger.debug('Starting thread') - self._start() - def _start(self, callback: Optional[Callable[[], None]] = None): + Raises: + RuntimeError: if PublishConfirm thread is already running """ - Start a thread to handle PublishConfirm operations + logger.debug('Starting thread') - Args: - callback (Optional[Callable[[], None]]): callback function to be invoked - once instance is ready to publish messages (all RabbitMQ connection and channel - are set up, delivery confirmation is enabled, etc.). Default to None. - """ - logger.debug('Starting thread with callback') - if callback is not None: - self._on_ready_callback = callback # to be invoked after all pika setup is done - self._thread.start() + # not possible to start Thread when it's already running + if self._thread.is_alive() or (self._connection is not None and self._connection.is_open): + raise RuntimeError('PublishConfirm thread already running, cannot be started') + self._start() def stop(self): """Stop the example by closing the channel and connection. We @@ -159,7 +156,6 @@ def stop(self): """ logger.info('Stopping') self._stopping = True - self._close_channel() self._close_connection() self._stopping = False # done stopping @@ -170,12 +166,26 @@ def _run(self): time.sleep(0.2) while not self._stopping: - time.sleep(5) + time.sleep(.1) if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() + def _start(self, callback: Optional[Callable[[], None]] = None): + """ + Start a thread to handle PublishConfirm operations + + Args: + callback (Optional[Callable[[], None]]): callback function to be invoked + once instance is ready to publish messages (all RabbitMQ connection and channel + are set up, delivery confirmation is enabled, etc.). Default to None. + """ + logger.debug('Starting thread with callback') + if callback is not None: + self._on_ready_callback = callback # to be invoked after all pika setup is done + self._thread.start() + def _create_connection(self): """This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method @@ -185,12 +195,7 @@ def _create_connection(self): conn = self._rmq_params.conn logger.info('Connecting to RabbitMQ: %s', conn) return SelectConnection( - parameters=ConnectionParameters( - host=conn.host, - virtual_host=conn.v_host, - port=conn.port, - credentials=PlainCredentials(conn.username, conn.password) - ), + parameters=conn.connection_parameters, on_open_callback=self._on_connection_open, on_open_error_callback=self._on_connection_open_error, on_close_callback=self._on_connection_closed) @@ -211,171 +216,146 @@ def _wait_for_channel_to_be_ready(self) -> None: logger.debug('Connection and channel setup complete, ready to publish message') - def _on_connection_open(self, _unused_connection): - """This method is called by pika once the connection to RabbitMQ has - been established. It passes the handle to the connection object in - case we need it, but in this case, we'll just mark it unused. - :param pika.SelectConnection _unused_connection: The connection + def _on_connection_open(self, connection: SelectConnection): + """This method is called by pika once the connection to RabbitMQ has been established. + + Args: + connection (SelectConnection): The connection """ - logger.debug('Connection opened') - self._open_channel() - - def _on_connection_open_error(self, _unused_connection, err): - """This method is called by pika if the connection to RabbitMQ - can't be established. - :param pika.SelectConnection _unused_connection: The connection - :param Exception err: The error + logger.debug('Connection opened. Creating a new channel') + + # Create a new channel. + # When RabbitMQ confirms the channel is open by sending the Channel.OpenOK RPC reply, + # the on_channel_open method will be invoked. + connection.channel(on_open_callback=self._on_channel_open) + + def _on_connection_open_error(self, connection: SelectConnection, err: Exception): + """This method is called by pika if the connection to RabbitMQ can't be established. + + Args: + connection (SelectConnection): The connection + err (Exception): The error """ logger.error('Connection open failed, reopening in 5 seconds: %s', err) - self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + connection.ioloop.call_later(5, connection.ioloop.stop) - def _on_connection_closed(self, _unused_connection, reason): + def _on_connection_closed(self, connection: SelectConnection, reason: Exception): """This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. - :param pika.connection.Connection _unused_connection: The closed connection obj - :param Exception reason: exception representing reason for loss of - connection. + + Args: + connection (SelectConnection): The closed connection obj + reason (Exception): exception representing reason for loss of connection. """ self._channel = None + if self._stopping: - self._connection.ioloop.stop() + connection.ioloop.stop() else: - logger.warning('Connection closed, reopening in 5 seconds: %s', - reason) - self._connection.ioloop.call_later(5, self._connection.ioloop.stop) - - def _open_channel(self): - """This method will open a new channel with RabbitMQ by issuing the - Channel.Open RPC command. When RabbitMQ confirms the channel is open - by sending the Channel.OpenOK RPC reply, the on_channel_open method - will be invoked. - """ - logger.debug('Creating a new channel') - self._connection.channel(on_open_callback=self._on_channel_open) + logger.warning('Connection closed, reopening in 5 seconds: %s', reason) + connection.ioloop.call_later(5, connection.ioloop.stop) def _on_channel_open(self, channel: Channel): """This method is invoked by pika when the channel has been opened. - The channel object is passed in so we can make use of it. - Since the channel is now open, we'll declare the exchange to use. - :param pika.channel.Channel channel: The channel object + The channel object is passed in so we can make use of it (declare the exchange to use). + + Args: + channel (Channel): The channel object """ logger.debug('Channel opened') self._channel = channel - self._add_on_channel_close_callback() - self._setup_exchange(self._rmq_params.exchange) - def _add_on_channel_close_callback(self): - """This method tells pika to call the on_channel_closed method if - RabbitMQ unexpectedly closes the channel. - """ logger.debug('Adding channel close callback') self._channel.add_on_close_callback(self._on_channel_closed) - def _on_channel_closed(self, channel, reason): + # Decleare exchange on our new channel + exch_name, exch_type = self._rmq_params.exchange + logger.debug('Declaring exchange %s', exch_name) + + # Note: using functools.partial is not required, it is demonstrating + # how arbitrary data can be passed to the callback when it is called + cb = functools.partial(self._on_exchange_declareok, userdata=exch_name) + self._channel.exchange_declare(exchange=exch_name, exchange_type=exch_type, callback=cb) + + def _on_channel_closed(self, channel: Channel, reason: Exception): """Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, we'll close the connection to shutdown the object. - :param pika.channel.Channel channel: The closed channel - :param Exception reason: why the channel was closed + + Args: + channel (Channel): The closed channel + reason (Exception): why the channel was closed """ logger.warning('Channel %i was closed: %s', channel, reason) self._channel = None if not self._stopping: self._close_connection() - def _setup_exchange(self, exchange: Exch): - """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC - command. When it is complete, the on_exchange_declareok method will - be invoked by pika. - :param str|unicode exchange_name: The name of the exchange to declare - """ - logger.debug('Declaring exchange %s', exchange.name) - # Note: using functools.partial is not required, it is demonstrating - # how arbitrary data can be passed to the callback when it is called - cb = functools.partial(self._on_exchange_declareok, - userdata=exchange.name) - self._channel.exchange_declare(exchange=exchange.name, - exchange_type=exchange.type, - callback=cb) - - def _on_exchange_declareok(self, _unused_frame, userdata): - """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC - command. - :param pika.Frame.Method _unused_frame: Exchange.DeclareOk response frame - :param str|unicode userdata: Extra user data (exchange name) + def _on_exchange_declareok(self, _unused_frame: Method, userdata: Union[str, bytes]): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC command. + + Args: + _unused_frame (Frame.Method): Exchange.DeclareOk response frame + userdata (Union[str, bytes]): Extra user data (exchange name) """ logger.debug('Exchange declared: %s', userdata) - self._setup_queue(self._rmq_params.queue) - def _setup_queue(self, queue: Queue): - """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC - command. When it is complete, the on_queue_declareok method will - be invoked by pika. - :param str|unicode queue_name: The name of the queue to declare. - """ + # Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is + # complete, the on_queue_declareok method will be invoked by pika. + queue = self._rmq_params.queue logger.debug('Declaring queue %s', queue.name) - args = {} # If we have a 'private' queue, i.e. one that is not consumed but used to support message publishing - if queue.name.startswith('_'): - # Set message time-to-live (TTL) to 10 seconds - args = {'x-message-ttl': 10000} - self._channel.queue_declare(queue=queue.name, - durable=queue.durable, - arguments=args, - exclusive=queue.exclusive, - auto_delete=queue.auto_delete, - callback=self._on_queue_declareok) - - def _on_queue_declareok(self, _unused_frame): + + # If we have a 'private' queue, i.e. one used to support message publishing, not consumed + # Set message time-to-live (TTL) to 10 seconds + args = {'x-message-ttl': 10 * 1000} if queue.name.startswith('_') else None + + if self._channel is not None: + self._channel.queue_declare(queue=queue.name, + durable=queue.durable, + arguments=args, + exclusive=queue.exclusive, + auto_delete=queue.auto_delete, + callback=self._on_queue_declareok) + + def _on_queue_declareok(self, _unused_frame: Method): """Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue and exchange together with the routing key by issuing the Queue.Bind RPC command. When this command is complete, the on_bindok method will be invoked by pika. - :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame + + Args: + _unused_frame (Frame): The Queue.DeclareOk frame """ - logger.debug('Binding %s to %s with #', - self._rmq_params.exchange.name, - self._rmq_params.queue.name) - self._channel.queue_bind(self._rmq_params.queue.name, - self._rmq_params.exchange.name, - routing_key='#', # Default wildcard key to consume everything - callback=self._on_bindok) - - def _on_bindok(self, _unused_frame): + _, exchange, queue = self._rmq_params + logger.debug('Binding %s to %s with #', exchange.name, queue.name) + + if self._channel is not None: + self._channel.queue_bind(queue.name, + exchange.name, + routing_key=queue.route_key, + callback=self._on_bindok) + + def _on_bindok(self, _unused_frame: Method): """This method is invoked by pika when it receives the Queue.BindOk response from RabbitMQ. Since we know we're now setup and bound, it's time to start publishing.""" logger.debug('Queue bound') - self._start_publishing() - def _start_publishing(self): - """This method will enable delivery confirmations and schedule the - first message to be sent to RabbitMQ - """ - logger.debug('Issuing consumer related RPC commands') - self._enable_delivery_confirmations() + # enable delivery confirmations and schedule the first message to be sent to RabbitMQ + logger.debug('Issuing Confirm.Select RPC command') + if self._channel is not None: + self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message + self._channel.confirm_delivery(self._on_delivery_confirmation) # notify up that channel can now be published to if self._on_ready_callback: self._on_ready_callback() - # self.schedule_next_message() - def _enable_delivery_confirmations(self): - """Send the Confirm.Select RPC method to RabbitMQ to enable delivery - confirmations on the channel. The only way to turn this off is to close - the channel and create a new one. - When the message is confirmed from RabbitMQ, the - on_delivery_confirmation method will be invoked passing in a Basic.Ack - or Basic.Nack method from RabbitMQ that will indicate which messages it - is confirming or rejecting. - """ - logger.debug('Issuing Confirm.Select RPC command') - if self._channel is not None: - self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message - self._channel.confirm_delivery(self._on_delivery_confirmation) + # self.schedule_next_message() def _on_delivery_confirmation(self, method_frame: Method): """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC @@ -386,7 +366,9 @@ def _on_delivery_confirmation(self, method_frame: Method): to keep track of stats and remove message numbers that we expect a delivery confirmation of from the list used to keep track of messages that are pending confirmation. - :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame + + Args: + method_frame (Method): Basic.Ack or Basic.Nack frame """ # tell python type checker that method will be an Ack or Nack (per pika docs) method = cast(Union[Basic.Ack, Basic.Nack], method_frame.method) @@ -411,23 +393,20 @@ def _on_delivery_confirmation(self, method_frame: Method): self._records.acked += 1 del self._records.deliveries[tmp_tag] - # NOTE: at some point you would check self._deliveries for stale entries - # and decide to attempt re-delivery + # NOTE: at some point we'd check self._deliveries for stale entries and attempt re-delivery logger.debug( - 'Published %i messages, %i have yet to be confirmed, ' - '%i were acked and %i were nacked', self._records.message_number, - len(self._records.deliveries), self._records.acked, self._records.nacked) + 'Published %i messages, %i have yet to be confirmed, %i were acked and %i were nacked', + self._records.message_number, + len(self._records.deliveries), + self._records.acked, + self._records.nacked) - def _close_channel(self): - """Invoke this command to close the channel with RabbitMQ by sending - the Channel.Close RPC command. - """ + def _close_connection(self): + """Close the connection to RabbitMQ (after closing any open channel on it).""" if self._channel is not None: logger.debug('Closing the channel') self._channel.close() - def _close_connection(self): - """This method closes the connection to RabbitMQ.""" if self._connection is not None: logger.debug('Closing connection') self._connection.close() diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index cce9db01..468d8f01 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -35,13 +35,23 @@ class Conn(NamedTuple): password: str def to_connection(self) -> BlockingConnection: - """Establish a new RabbitMQ connection using attributes in Conn data class""" - return BlockingConnection(ConnectionParameters( + """Establish a new RabbitMQ connection using attributes in Conn data class + + Returns: + BlockingConnection: newly established instance of pika.BlockingConnection + """ + return BlockingConnection(parameters=self.connection_parameters) + + @property + def connection_parameters(self) -> ConnectionParameters: + """Convert Conn data object into pika.ConnectionParameters, ready to be passed + to pika connection constructors such as BlockingConnection() or SelectConnection()""" + return ConnectionParameters( host=self.host, virtual_host=self.v_host, port=self.port, credentials=PlainCredentials(self.username, self.password) - )) + ) class Exch(NamedTuple): diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py index b7dbe6ca..7dbc9ea6 100644 --- a/python/idsse_common/test/test_publish_confirm.py +++ b/python/idsse_common/test/test_publish_confirm.py @@ -13,10 +13,10 @@ # pylint: disable=too-few-public-methods,unused-argument from time import sleep -from typing import Callable, Union, Any, NamedTuple +from typing import Callable, Union, Any, NamedTuple, Self from unittest.mock import Mock -from pytest import fixture, MonkeyPatch +from pytest import fixture, raises, MonkeyPatch from pika.spec import Basic from idsse.common.publish_confirm import PublishConfirm @@ -90,15 +90,21 @@ def close(self): class IOLoop: """mock of pika.SelectConnection.ioloop""" - def __init__(self, on_open: Callable[[Any], None], on_close: Callable[[Any, str], None]): + def __init__( + self, + connection, + on_open: Callable[[Any], Any], + on_close: Callable[[Any, str], Any] + ): + self._connection = connection self.on_open = on_open self.on_close = on_close def start(self): - self.on_open(None) + self.on_open(self._connection) def stop(self): - self.on_close(None, 'some_reason') + self.on_close(self._connection, 'some_reason') def call_later(self): pass @@ -108,14 +114,14 @@ class SelectConnection: """mock of pika.SelectConnection""" def __init__(self, parameters, - on_open_callback: Callable[[Any], None], + on_open_callback: Callable[[Any], Self], on_open_error_callback, - on_close_callback: Callable[[Any, str], None]): + on_close_callback: Callable[[Any, str], Self]): self.is_open = True self.is_closed = False self._context = MockPika() - self.ioloop = self._context.IOLoop(on_open_callback, on_close_callback) + self.ioloop = self._context.IOLoop(self, on_open_callback, on_close_callback) def channel(self, on_open_callback: Callable[[Any], None]): """ @@ -241,7 +247,7 @@ def mock_sleep_function(secs: float): # mock sleep someimtes captures a call from PublishConfirm.run(), due to a race condition # between this test's thread and the PublishConfirm thread. Both results are acceptable sleep_call_args = [call.args for call in mock_sleep.call_args_list] - assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (5,)])] + assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (0.1,)])] def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: MockPika): @@ -251,3 +257,12 @@ def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: assert pub_conf._channel is None pub_conf._wait_for_channel_to_be_ready() assert pub_conf._channel is not None and pub_conf._channel.is_open + +def test_calling_start_twice_raises_error(monkeypatch: MonkeyPatch, context: MockPika): + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) + pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) + + pub_conf.start() + with raises(RuntimeError) as exc: + pub_conf.start() + assert exc is not None