diff --git a/python/grib_to_zarr/GribToZarr.py b/python/grib_to_zarr/GribToZarr.py deleted file mode 100644 index 36be4179..00000000 --- a/python/grib_to_zarr/GribToZarr.py +++ /dev/null @@ -1,80 +0,0 @@ - - - - -import xarray -import cfgrib -import glob -import os - - -#where the NBM grib files are -datadir='/Volumes/SSD/Users/Travis/Programs/Air/python3/grib_to_zarr/NBM' -#name of the NBM grib files to convert -files='*grib2' -#output directory for zarr files -outdir='/Volumes/SSD/Users/Travis/Programs/Air/python3/grib_to_zarr/NBM_zarr' -#concatenated grib file; no need to change -master_grib='out.grib' - -def main(): - concat() - grib_to_zarr() - - -def concat(): - if not os.path.isfile(datadir+'/'+master_grib): - print("Concatenating grib files") - with open(datadir+'/'+master_grib,'wb') as outfile: - for filename in glob.glob(datadir+'/'+files): - with open(filename, "rb") as infile: - outfile.write(infile.read()) - - - -def grib_to_zarr(): - - print("Converting gribs to zarr") - #if output directory doesn't exist, create it - if not os.path.exists(outdir): - os.makedirs(outdir) - - - alldata = cfgrib.open_datasets(datadir+'/'+master_grib,backend_kwargs={'read_keys': ['gridType']}) - - for data in alldata: - print("") - - for varname, da in data.data_vars.items(): - gribname = data[varname].attrs['long_name'] #long_name #GRIB_shortName - gribname = gribname.replace(" ", ".") - step = data[varname].attrs['GRIB_stepType'] - try: - level = data[varname].attrs['GRIB_typeOfLevel'] - except: - level = 'unknown' - - - outfile = outdir + '/' + gribname+'_'+level+'_'+step - - print (outfile) - d2 = data[varname].to_dataset() - - #without the load, rechunking on grib takes forever - d2.load() - - #now chunk - #don't put chunking before load, otherwise it loads the whole dataset, takes forever - if 'x' in data[varname].dims and 'y' in data[varname].dims and 'step' in data[varname].dims: - d2 = d2.chunk({"x":150, "y":150, "step": -1}) - else: - d2 = d2.chunk({"x":150, "y":150}) - - d2.to_zarr(outfile+'.zarr',mode='w') - d2.to_netcdf(outfile+'.nc',mode='w') - - - -if __name__ == "__main__": - main() - diff --git a/python/idsse_common/idsse/common/log_util.py b/python/idsse_common/idsse/common/log_util.py index 2f0e223c..c1c4edd7 100644 --- a/python/idsse_common/idsse/common/log_util.py +++ b/python/idsse_common/idsse/common/log_util.py @@ -156,7 +156,7 @@ def get_default_log_config(level: str, 'loggers': { '': { 'level': level, - 'handlers': ['default'], # , 'rabbit'] + 'handlers': ['default'], # , 'rabbit'] }, } } diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py deleted file mode 100644 index fc28dd10..00000000 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ /dev/null @@ -1,491 +0,0 @@ -"""Module for PublishConfirm threaded RabbitMQ publisher""" -# ---------------------------------------------------------------------------------- -# Created on Fri Jun 23 2023. -# -# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) -# Copyright (c) 2023 Colorado State University. All rights reserved. (2) -# -# Contributors: -# Geary Layne (1) -# Paul Hamer (2) -# Mackenzie Grimes (2) -# ---------------------------------------------------------------------------------- -# pylint: disable=C0111,C0103,R0205 - -import functools -import logging -import logging.config -import json -import time -from concurrent.futures import Future -from dataclasses import dataclass, field -from itertools import count -from threading import Thread -from typing import NamedTuple, cast - -from pika import SelectConnection, BasicProperties -from pika.channel import Channel -from pika.exceptions import AMQPConnectionError, AMQPChannelError -from pika.frame import Method -from pika.spec import Basic - -from idsse.common.rabbitmq_utils import Conn, Exch, Queue - -logger = logging.getLogger(__name__) - - -@dataclass -class PublishConfirmRecords: - """Data class to track RabbitMQ activity metadata - - Args: - deliveries (dict[int, str]): mapping of delivered message IDs to message content - acked (int): Count of acknowledged RabbitMQ messages - nacked (int): Count of unacknowledged RabbitMQ messages - message_number (int): The ID which will be assigned to the next published message - """ - deliveries: dict[int, str] = field(default_factory=dict) - acked: int = 0 - nacked: int = 0 - message_number: int = 0 - - -class PublishConfirmParams(NamedTuple): - """Data class to hold RabbitMQ configurations for PublishConfirm""" - conn: Conn - exchange: Exch - queue: Queue - - -class PublishConfirm: - """This is a publisher that will handle unexpected interactions - with RabbitMQ such as channel and connection closures for any process. - If RabbitMQ closes the connection, it will reopen it. You should - look at the output, as there are limited reasons why the connection may - be closed, which usually are tied to permission related issues or - socket timeouts. - """ - # pylint: disable=too-many-instance-attributes - - def __init__(self, conn: Conn, exchange: Exch, queue: Queue): - """Setup the example publisher object, passing in the RabbitMqUtils we will use to - connect to RabbitMQ. - - Args: - conn (Conn): The RabbitMQ connection detail object - exchange (Exch): The RabbitMQ exchange details. - queue (Queue): The RabbitMQ queue details. If name starts with '_', will be setup as - a "private queue", i.e. not intended for consumers, and all published messages - will have a 10-second TTL. - """ - self._thread: None | Thread = None # not initialized yet - self._next_thread_id = count(0).__next__ # start at 0, iterate each time Thread created - - self._connection: SelectConnection | None = None - self._channel: Channel | None = None - - self._stopping = False - self._rmq_params = PublishConfirmParams(conn, exchange, queue) - - self._records = PublishConfirmRecords() # data class to track message activity - self._is_ready_future: Future | None = None - - self._create_thread() - - def publish_message(self, - message: dict, - routing_key='', - corr_id: str | None = None) -> bool: - """If the class is not stopping, publish a message to RabbitMQ, - appending a list of deliveries with the message number that was sent. - This list will be used to check for delivery confirmations in the - on_delivery_confirmations method. - - Args: - message (dict): message to publish (should be valid json) - routing_key (str): routing_key to route the message to correct consumer. - Default is empty str - corr_id (str | None): optional correlation_id to include in message - - Returns: - bool: True if message successfully published to queue (channel was open and - publish did not throw exception) - Raises: - RuntimeError: if channel is uninitialized (start() not completed yet) or is closed - """ - properties = BasicProperties(content_type='application/json', - content_encoding='utf-8', - correlation_id=corr_id) - logger.info('Publishing message to queue %s, message length: %d', - self._rmq_params.queue.name, len(json.dumps(message))) - - is_channel_ready = self._wait_for_channel_to_be_ready() - if not is_channel_ready: - logger.error('RabbitMQ channel not established for some reason. Cannot publish') - return False - - logger.debug('DEBUG: channel is ready to publish message') - try: - self._channel.basic_publish(self._rmq_params.exchange.name, - routing_key, - json.dumps(message, ensure_ascii=True), - properties) - except (AMQPChannelError, AMQPConnectionError) as exc: - # something wrong with RabbitMQ connection; destroy and recreate the daemon Thread - logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s', - type(exc), str(exc)) - - # create new Thread, abandoning old one (it will shut itself down) - self._create_thread() - if not self._wait_for_channel_to_be_ready(): - logger.warning('Second attempt to connect to RabbitMQ failed. Cannnot publish') - return False - - try: - self._channel.basic_publish(self._rmq_params.exchange.name, - routing_key, - json.dumps(message, ensure_ascii=True), - properties) - except (AMQPChannelError, AMQPConnectionError) as retry_exc: - logger.error('Second attempt to publish message failed: (%s) %s', - type(retry_exc), str(retry_exc)) - return False - - # publish worked on the first (or second) try - self._records.message_number += 1 - self._records.deliveries[self._records.message_number] = message - logger.info('Published message # %i to exchange %s, queue %s, routing_key %s', - self._records.message_number, self._rmq_params.exchange.name, - self._rmq_params.queue.name, routing_key) - - return True - - def start(self, is_ready: Future | None = None): - """ - Start thread to handle PublicConfirm operations, such as connect to RabbitMQ, declare - a queue and enable delivery confirmation. If not None, ```is_ready``` will be resolved - to True when setup is complete and messages can be published. - - Args: - is_ready (Future | None): optional Python Future that will be resolved once - PublishConfirm connection & channel are ready to publish messages, or raise an - exception if some issue is encountered on setup. Default is None - Raises: - RuntimeError: if PublishConfirm thread is already running - """ - logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s', - is_ready, self._is_running(), self._channel) - - if is_ready is not None: - self._is_ready_future = is_ready # to be invoked after all pika setup is done - - # not possible to start Thread when it's already running - if self._is_running() or self._is_connected(): - raise RuntimeError('PublishConfirm thread already running, cannot be started') - - self._thread.start() - - def stop(self): - """Stop the example by closing the channel and connection. We - set a flag here so that we stop scheduling new messages to be - published. The IOLoop is started because this method is - invoked by the Try/Catch below when KeyboardInterrupt is caught. - Starting the IOLoop again will allow the publisher to cleanly - disconnect from RabbitMQ. - """ - logger.info('Stopping Thread %s', self._thread and self._thread.name) - self._stopping = True - self._close_connection() - self._stopping = False # done stopping - - def _is_connected(self) -> bool: - """True if RabbitMQ Connection and Channel exist and are open""" - return (self._connection is not None and self._connection.is_open - and self._channel is not None and self._channel.is_open) - - def _is_running(self) -> bool: - """True if PublishConfirm Thread exists and is running""" - return self._thread and self._thread.is_alive() - - def _run(self): - """Run a new thread: get a new RMQ connection, and start looping until stop() is called""" - logger.debug('Creating connection') - self._connection = self._create_connection() - self._connection.ioloop.start() - time.sleep(0.2) - - while not self._stopping: - time.sleep(.1) - - if self._connection is not None and not self._connection.is_closed: - # Finish closing - self._connection.ioloop.start() - logger.info('Thread %s finished stopping', self._thread.name) - - def _create_thread(self): - """ - Create new python Thread for this PublishConfirm, allowing any pre-existing Thread to stop - """ - if self._thread is not None: - self.stop() # halt previously existing Thread to be sure it exits eventually - self._records = PublishConfirmRecords() # reset delivery_tag and such - - self._thread = Thread(name=f'PublishConfirm-{self._next_thread_id()}', - daemon=True, - target=self._run) - - def _create_connection(self): - """This method connects to RabbitMQ, returning the connection handle. - When the connection is established, the on_connection_open method - will be invoked by pika. - :rtype: pika.SelectConnection - """ - conn = self._rmq_params.conn - logger.debug('Connecting to RabbitMQ: %s', conn) - return SelectConnection( - parameters=conn.connection_parameters, - on_open_callback=self._on_connection_open, - on_open_error_callback=self._on_connection_open_error, - on_close_callback=self._on_connection_closed) - - def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool: - """If connection or channel are not open, start the PublishConfirm to do needed - RabbitMQ setup. This method will not return until channel is confirmed ready for use, - or timeout is exceeded. - - Args: - timeout (optional, float): Duration of time, in seconds, to wait for RabbitMQ - connection, channel, exchange and queue to be setup and ready to send messages. - If timeout is None, thread will wait indefinitely. Default is 6 seconds. - - Returns: - bool: True if channel is ready. False if timed out waiting for RabbitMQ to connect - """ - - # validate that PublishConfirm thread has been set up and connected to RabbitMQ - logger.debug('DEBUG _wait_for_channel_to_be_ready state') - logger.debug(self._connection) - logger.debug(self._channel) - logger.debug('----------------------') - - if self._is_connected(): - return True # connection and channel already open, no setup needed - - logger.info('Channel is not ready to publish, calling start() now') - # pass callback to flip is_ready flag, and block until flag changes - is_ready_future = Future() - - logger.debug('calling start() with callback') - self.start(is_ready=is_ready_future) - - logger.info('waiting for is_ready flag to be set') - try: - is_ready_future.result(timeout=timeout) - logger.info('Connection and channel setup complete, ready to publish message') - return True - except TimeoutError: - logger.error('Timed out waiting for RabbitMQ connection, channel, or exchange') - return False - except Exception as exc: # pylint: disable=broad-exception-caught - logger.error('RabbitMQ rejected connection for some reason: %s', str(exc)) - return False - - def _on_connection_open(self, connection: SelectConnection): - """This method is called by pika once the connection to RabbitMQ has been established. - - Args: - connection (SelectConnection): The connection - """ - logger.debug('Connection opened. Creating a new channel') - - # Create a new channel. - # When RabbitMQ confirms the channel is open by sending the Channel.OpenOK RPC reply, - # the on_channel_open method will be invoked. - connection.channel(on_open_callback=self._on_channel_open) - - def _on_connection_open_error(self, connection: SelectConnection, err: Exception): - """This method is called by pika if the connection to RabbitMQ can't be established. - - Args: - connection (SelectConnection): The connection - err (Exception): The error - """ - logger.error('Connection open failed, reopening in 5 seconds: %s', err) - connection.ioloop.call_later(5, connection.ioloop.stop) - - def _on_connection_closed(self, connection: SelectConnection, reason: Exception): - """This method is invoked by pika when the connection to RabbitMQ is - closed unexpectedly. Since it is unexpected, we will reconnect to - RabbitMQ if it disconnects. - - Args: - connection (SelectConnection): The closed connection obj - reason (Exception): exception representing reason for loss of connection. - """ - self._channel = None - - if self._stopping: - connection.ioloop.stop() - else: - logger.warning('Connection closed, reopening in 5 seconds: %s', reason) - connection.ioloop.call_later(5, connection.ioloop.stop) - - def _on_channel_open(self, channel: Channel): - """This method is invoked by pika when the channel has been opened. - The channel object is passed in so we can make use of it (declare the exchange to use). - - Args: - channel (Channel): The channel object - """ - logger.debug('Channel opened') - self._channel = channel - - logger.debug('Adding channel close callback') - self._channel.add_on_close_callback(self._on_channel_closed) - - # Declare exchange on our new channel - exch_name, exch_type, _ = self._rmq_params.exchange - logger.debug('Declaring exchange %s', exch_name) - - # Note: using functools.partial is not required, it is demonstrating - # how arbitrary data can be passed to the callback when it is called - cb = functools.partial(self._on_exchange_declareok, userdata=exch_name) - try: - self._channel.exchange_declare(exchange=exch_name, - exchange_type=exch_type, - callback=cb) - except ValueError as exc: - logger.warning('RabbitMQ failed to declare exchange: (%s) %s', type(exc), str(exc)) - if self._is_ready_future: - self._is_ready_future.set_exception(exc) # notify caller that we could not connect - - def _on_channel_closed(self, channel: Channel, reason: Exception): - """Invoked by pika when RabbitMQ unexpectedly closes the channel. - Channels are usually closed if you attempt to do something that - violates the protocol, such as re-declare an exchange or queue with - different parameters. In this case, we'll close the connection - to shutdown the object. - - Args: - channel (Channel): The closed channel - reason (Exception): why the channel was closed - """ - logger.warning('Channel %i was closed: %s', channel, reason) - self._channel = None - if not self._stopping: - self._close_connection() - - def _on_exchange_declareok(self, _unused_frame: Method, userdata: str | bytes): - """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC command. - - Args: - _unused_frame (Frame.Method): Exchange.DeclareOk response frame - userdata (str | bytes): Extra user data (exchange name) - """ - logger.debug('Exchange declared: %s', userdata) - - # Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is - # complete, the on_queue_declareok method will be invoked by pika. - queue = self._rmq_params.queue - logger.debug('Declaring queue %s', queue.name) - - # If we have a 'private' queue, i.e. one used to support message publishing, not consumed - # Set message time-to-live (TTL) to 10 seconds - args = {'x-message-ttl': 10 * 1000} if queue.name.startswith('_') else None - - if self._channel is not None: - self._channel.queue_declare(queue=queue.name, - durable=queue.durable, - arguments=args, - exclusive=queue.exclusive, - auto_delete=queue.auto_delete, - callback=self._on_queue_declareok) - - def _on_queue_declareok(self, _unused_frame: Method): - """Method invoked by pika when the Queue.Declare RPC call made in - setup_queue has completed. In this method we will bind the queue - and exchange together with the routing key by issuing the Queue.Bind - RPC command. When this command is complete, the on_bindok method will - be invoked by pika. - - Args: - _unused_frame (Frame): The Queue.DeclareOk frame - """ - _, exchange, queue = self._rmq_params - logger.debug('Binding %s to %s with #', exchange.name, queue.name) - - if self._channel is not None: - self._channel.queue_bind(queue.name, - exchange.name, - routing_key=queue.route_key, - callback=self._on_bindok) - - def _on_bindok(self, _unused_frame: Method): - """This method is invoked by pika when it receives the Queue.BindOk - response from RabbitMQ. Since we know we're now setup and bound, it's - time to start publishing.""" - logger.debug('Queue bound') - - # enable delivery confirmations and schedule the first message to be sent to RabbitMQ - logger.debug('Issuing Confirm.Select RPC command') - if self._channel is not None: - self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message - self._channel.confirm_delivery(self._on_delivery_confirmation) - - logger.debug('RabbitMQ channel ready, passing control back to caller') - if self._is_ready_future: - self._is_ready_future.set_result(True) # notify that channel can now be published to - - def _on_delivery_confirmation(self, method_frame: Method): - """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC - command, passing in either a Basic.Ack or Basic.Nack frame with - the delivery tag of the message that was published. The delivery tag - is an integer counter indicating the message number that was sent - on the channel via Basic.Publish. Here we're just doing housekeeping - to keep track of stats and remove message numbers that we expect - a delivery confirmation of from the list used to keep track of messages - that are pending confirmation. - - Args: - method_frame (Method): Basic.Ack or Basic.Nack frame - """ - # tell python type checker that method will be an Ack or Nack (per pika docs) - method = cast(Basic.Ack | Basic.Nack, method_frame.method) - - confirmation_type = method.NAME.split('.')[1].lower() - ack_multiple = method.multiple - delivery_tag = method.delivery_tag - - logger.info('Received %s for delivery tag: %i (multiple: %s)', - confirmation_type, delivery_tag, ack_multiple) - - if confirmation_type == 'ack': - self._records.acked += 1 - elif confirmation_type == 'nack': - self._records.nacked += 1 - - del self._records.deliveries[delivery_tag] - - if ack_multiple: - for tmp_tag in list(self._records.deliveries.keys()): - if tmp_tag <= delivery_tag: - self._records.acked += 1 - del self._records.deliveries[tmp_tag] - - # NOTE: at some point we'd check self._deliveries for stale entries and attempt re-delivery - logger.debug( - 'Published %i messages, %i have yet to be confirmed, %i were acked and %i were nacked', - self._records.message_number, - len(self._records.deliveries), - self._records.acked, - self._records.nacked) - - def _close_connection(self): - """Close the connection to RabbitMQ (after closing any open channel on it).""" - if self._channel is not None: - logger.debug('Closing the channel') - self._channel.close() - - if self._connection is not None: - logger.debug('Closing connection') - self._connection.close() diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 7f6cd105..e07aa9ab 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -11,17 +11,19 @@ # # ---------------------------------------------------------------------------------- -import json import logging import logging.config +import uuid +from concurrent.futures import ThreadPoolExecutor from collections.abc import Callable +from functools import partial +from threading import Event, Thread from typing import NamedTuple from pika import BasicProperties, ConnectionParameters, PlainCredentials from pika.adapters import BlockingConnection -from pika.adapters.blocking_connection import BlockingChannel from pika.channel import Channel -from pika.exceptions import AMQPConnectionError +from pika.exceptions import UnroutableError from pika.frame import Method from pika.spec import Basic @@ -39,14 +41,6 @@ class Conn(NamedTuple): username: str password: str - def to_connection(self) -> BlockingConnection: - """Establish a new RabbitMQ connection using attributes in Conn data class - - Returns: - BlockingConnection: newly established instance of pika.BlockingConnection - """ - return BlockingConnection(parameters=self.connection_parameters) - @property def connection_parameters(self) -> ConnectionParameters: """Convert Conn data object into pika.ConnectionParameters, ready to be passed @@ -63,7 +57,10 @@ class Exch(NamedTuple): """An internal data class for holding the RabbitMQ exchange info""" name: str type: str + route_key: str = '' durable: bool = True + delivery_conf: bool | None = None + mandatory: bool | None = None class Queue(NamedTuple): @@ -73,16 +70,26 @@ class Queue(NamedTuple): durable: bool exclusive: bool auto_delete: bool + type: str = 'classic' class RabbitMqParams(NamedTuple): """Data class to hold configurations for RabbitMQ exchange/queue pair""" exchange: Exch - queue: Queue + queue: Queue = None + + +class RabbitMqParamsAndCallback(NamedTuple): + """ + Data class to hold configurations for RabbitMQ exchange/queue pair and the callback + to be used when consuming from the queue + """ + params: RabbitMqParams + callback: Callable def _initialize_exchange_and_queue( - channel: BlockingChannel, + channel: Channel, params: RabbitMqParams ) -> str: """Declare and bind RabbitMQ exchange and queue using the provided channel. @@ -95,7 +102,9 @@ def _initialize_exchange_and_queue( # Do not try to declare the default exchange. It already exists if exch.name != '': - channel.exchange_declare(exchange=exch.name, exchange_type=exch.type, durable=exch.durable) + channel.exchange_declare(exchange=exch.name, + exchange_type=exch.type, + durable=exch.durable) # Do not try to declare or bind built-in queues. They are pseudo-queues that already exist if queue.name.startswith('amq.rabbitmq.'): @@ -120,26 +129,22 @@ def _initialize_exchange_and_queue( def _initialize_connection_and_channel( - connection: Conn | BlockingConnection, + connection: Conn, params: RabbitMqParams, - channel: BlockingChannel | Channel | None = None, + channel: Channel | Channel | None = None, ) -> tuple[BlockingConnection, Channel, str]: - """Establish (or reuse) RabbitMQ connection, and declare exchange and queue on new Channel""" - if isinstance(connection, Conn): - # Use connection as parameters to establish new connection - _connection = connection.to_connection() - logger.info('Established new RabbitMQ connection to %s on port %i', - connection.host, connection.port) - elif isinstance(connection, BlockingConnection): - # Or existing open connection was provided, so use that - _connection = connection - else: + """Establish RabbitMQ connection, and declare exchange and queue on new Channel""" + if not isinstance(connection, Conn): # connection of unsupported type passed raise ValueError( (f'Cannot use or create new RabbitMQ connection using type {type(connection)}. ' - 'Should be one of: [Conn, pika.BlockingConnection]') + 'Should a Conn (a dict with connection parameters)') ) + _connection = BlockingConnection(parameters=connection) + logger.info('Established new RabbitMQ connection to %s on port %i', + connection.host, connection.port) + if channel is None: logger.info('Creating new RabbitMQ channel') _channel = _connection.channel() @@ -155,9 +160,9 @@ def subscribe_to_queue( connection: Conn | BlockingConnection, rmq_params: RabbitMqParams, on_message_callback: Callable[ - [BlockingChannel, Basic.Deliver, BasicProperties, bytes], None], - channel: BlockingChannel | None = None -) -> tuple[BlockingConnection, BlockingChannel]: + [Channel, Basic.Deliver, BasicProperties, bytes], None], + channel: Channel | None = None +) -> tuple[BlockingConnection, Channel]: """ Function that handles setup of consumer of RabbitMQ queue messages, declaring the exchange and queue if needed, and invoking the provided callback when a message is received. @@ -198,8 +203,55 @@ def subscribe_to_queue( return _connection, _channel -def threadsafe_call(connection, channel, *partial_functions): - """This function provides a thread safe way to call pika functions (or functions that call +def _setup_exch_and_queue(channel: Channel, exch: Exch, queue: Queue): + """Setup an exchange and queue and bind them with the queue's route key(s)""" + if queue.type == 'quorum' and queue.auto_delete: + raise ValueError('Quorum queues can not be configured to auto delete') + + _setup_exch(channel, exch) + + result: Method = channel.queue_declare( + queue=queue.name, + exclusive=queue.exclusive, + durable=queue.durable, + auto_delete=queue.auto_delete, + arguments={'x-queue-type': queue.type} + ) + queue_name = result.method.queue + logger.debug('Declared queue: %s', queue_name) + + if isinstance(queue.route_key, list): + for route_key in queue.route_key: + channel.queue_bind( + queue_name, + exchange=exch.name, + routing_key=route_key + ) + logger.debug('Bound queue(%s) to exchange(%s) with route_key(%s)', + queue_name, exch.name, route_key) + else: + channel.queue_bind( + queue_name, + exchange=exch.name, + routing_key=queue.route_key + ) + logger.debug('Bound queue(%s) to exchange(%s) with route_key(%s)', + queue_name, exch.name, queue.route_key) + + +def _setup_exch(channel: Channel, exch: Exch): + """Setup and exchange""" + channel.exchange_declare( + exchange=exch.name, + exchange_type=exch.type, + durable=exch.durable + ) + logger.debug('Declared exchange: %s', exch.name) + + +def threadsafe_call(channel: Channel, *functions: Callable): + """ + This function provides a thread safe way to call pika functions (or functions that call pika functions) from a thread other than the main. The need for this utility is practice of executing function/method and separate thread to avoid blocking the rabbitMQ heartbeat messages send by pika from the main thread. @@ -209,13 +261,13 @@ def threadsafe_call(connection, channel, *partial_functions): Examples: # Simple ack a message - threadsafe_call(self.connection, self.channel, + threadsafe_call(self.channel, partial(self.channel.basic_ack, delivery_tag=delivery_tag)) # RPC response followed and nack without requeueing response = {'Error': 'Invalid request'} - threadsafe_call(self.connection, self.channel, + threadsafe_call(self.channel, partial(self.channel.basic_publish, exchange='', routing_key=response_props.reply_to, @@ -226,99 +278,264 @@ def threadsafe_call(connection, channel, *partial_functions): requeue=False)) # Publishing message via the PublishConfirm utility - threadsafe_call(self.connection, self.pub_conf.channel, + threadsafe_call(self.pub_conf.channel, partial(self.pub_conf.publish_message, message=message)) Args: - connection (BlockingConnection): RabbitMQ connection. channel (BlockingChannel): RabbitMQ channel. - partial_functions (Callable): One or more callable function (typically created via - functools.partial) + functions (Callable): One or more callable function, typically created via + functools.partial or lambda, but can be function without args """ def call_if_channel_is_open(): if channel.is_open: - for func in partial_functions: + for func in functions: func() else: logger.error('Channel closed before callback could be run') raise ConnectionError('RabbitMQ Channel is closed') - connection.add_callback_threadsafe(call_if_channel_is_open) + channel.connection.add_callback_threadsafe(call_if_channel_is_open) + + +def threadsafe_ack( + channel: Channel, + delivery_tag: int, + extra_func: Callable = None, +): + """ + This is just a convenance function that acks a message via threadsafe_call + + Args: + channel (BlockingChannel): RabbitMQ channel. + delivery_tag (int): Delivery tag to be used when nacking. + extra_func (Callable): Any extra function that you would like to be called after the nack. + Typical use case would we to send a log via a lambda + (e.g. extra_func = lambda: logger.debug('Message has been nacked')). + """ + if extra_func: + threadsafe_call(channel, lambda: channel.basic_ack(delivery_tag), extra_func) + else: + threadsafe_call(channel, lambda: channel.basic_ack(delivery_tag)) -class PublisherSync: +def threadsafe_nack( + channel: Channel, + delivery_tag: int, + extra_func: Callable = None, + requeue: bool = False, +): """ - Uses a synchronous, blocking RabbitMQ connection to publish messages (no thread safety - or multithreading support). It's recommended that you gracefully close the connection when - you're done with it using close(). + This is just a convenance function that nacks a message via threadsafe_call Args: - conn_params (Conn): connection parameters to establish a new - RabbitMQ connection - rmq_params (RabbitMqParams): parameters for RabbitMQ exchange and queue on which to publish - messages + channel (BlockingChannel): RabbitMQ channel. + delivery_tag (int): Delivery tag to be used when nacking. + extra_func (Callable): Any extra function that you would like to be called after the nack. + Typical use case would we to send a log via a lambda + (e.g. extra_func = lambda: logger.debug('Message has been nacked')). + requeue (bool, optional): Indication if the message should be re-queued. Defaults to False. + """ + if extra_func: + threadsafe_call(channel, + lambda: channel.basic_nack(delivery_tag, requeue=requeue), + extra_func) + else: + threadsafe_call(channel, lambda: channel.basic_nack(delivery_tag, requeue=requeue)) + + +class Consumer(Thread): + """ + RabbitMQ consumer, runs in own thread to not block heartbeat. A thread pool + is used to not so much to parallelize the execution but rather to manage the + execution of the callbacks, including being able to wait for completion on + shutdown. The start() and stop() methods should be called from the same + thread as the one used to create the instance. """ def __init__( self, - conn_params: Conn, - rmq_params: RabbitMqParams, - channel: Channel | None = None, - - ) -> tuple[BlockingConnection, Channel]: - # save params + conn_params: ConnectionParameters, + rmq_params_and_callbacks: RabbitMqParamsAndCallback | list[RabbitMqParamsAndCallback], + num_message_handlers: int, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.daemon = True + self._tpx = ThreadPoolExecutor(max_workers=num_message_handlers) self._conn_params = conn_params - self._rmq_params = rmq_params + if isinstance(rmq_params_and_callbacks, list): + self._rmq_params_and_callbacks = rmq_params_and_callbacks + else: + self._rmq_params_and_callbacks = [rmq_params_and_callbacks] + self.connection = BlockingConnection(parameters=self._conn_params) + self.channel = self.connection.channel() - # establish BlockingConnection and declare exchange and queue on Channel - self._connection, self._channel, self._queue_name = _initialize_connection_and_channel( - conn_params, rmq_params, channel, - ) + self._consumer_tags = [] + for (exch, queue), func in self._rmq_params_and_callbacks: + _setup_exch_and_queue(self.channel, exch, queue) + self._consumer_tags.append( + self.channel.basic_consume(queue.name, + partial(self._on_message, func=func)) + ) + + self.channel.basic_qos(prefetch_count=1) - self._channel.confirm_delivery() # enable delivery confirmations from RabbitMQ broker + def run(self): + logger.info('Start Consuming... (to stop press CTRL+C)') + self.channel.start_consuming() def stop(self): - """Cleanly close ("stop") any open RabbitMQ connection and channel. This has the same - functionality as close(), it's just to match the interface of ```PublishConfirm```""" - self.close() + """Cleanly end the running of a thread, free up resources""" + logger.info('Stopping consumption of messages...') + logger.debug('Waiting for any currently running workers (this could take some time)') + self._tpx.shutdown(wait=True, cancel_futures=True) + # it would be nice to stop consuming before shutting down the thread pool, but when done in + # in the other order completed tasks can't be (n)ack-ed, this does mean that messages can be + # consumed from the queue and the shutdown starts that will not be processed, nor (n)ack-ed + + if self.connection and self.connection.is_open: + # there should be one consumer tag for each channel being consumed from + if self._consumer_tags: + threadsafe_call(self.channel, + *[partial(self.channel.stop_consuming, consumer_tag) + for consumer_tag in self._consumer_tags], + lambda: logger.info('Stopped Consuming')) + + threadsafe_call(self.channel, + self.channel.close, + self.connection.close) + + # pylint: disable=too-many-arguments + def _on_message(self, channel, method, properties, body, func): + """This is the callback wrapper, the core callback is passed as func""" + try: + self._tpx.submit(func, channel, method, properties, body) + except RuntimeError as exe: + logger.error('Unable to submit it to thread pool, Cause: %s', exe) - def close(self): - """Cleanly close any open RabbitMQ connection and channel""" - def _close_connection(): - if self._channel: - self._channel.close() - self._connection.close() - self._connection.add_callback_threadsafe(_close_connection) +class Publisher(Thread): + """ + RabbitMQ publisher, runs in own thread to not block heartbeat. The start() and stop() + methods should be called from the same thread as the one used to create the instance. + """ + def __init__( + self, + conn_params: ConnectionParameters, + exch_params: Exch, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.daemon = True + self._is_running = True + self._exch = exch_params + self._queue = None + + self.connection = BlockingConnection(conn_params) + self.channel = self.connection.channel() + + # if delivery is mandatory there must be a queue attach to the exchange + if self._exch.mandatory: + self._queue = Queue(name=f'_{self._exch.name}_{uuid.uuid4()}', + route_key=self._exch.route_key, + durable=False, + exclusive=True, + auto_delete=False) + + _setup_exch_and_queue(self.channel, self._exch, self._queue) + else: + _setup_exch(self.channel, self._exch) - def publish_message(self, message: dict, routing_key='', corr_id: str | None = None) -> bool: - """Publish a message to the RabbitMQ queue. Non-blocking, and no delivery confirmation. - Returns False if message is invalid or could not be sent, but otherwise no validation. + if self._exch.delivery_conf: + self.channel.confirm_delivery() + + def run(self): + logger.info('Starting publisher') + while self._is_running: + self.connection.process_data_events(time_limit=1) + + def publish(self, message: bytes, properties: BasicProperties = None): + """ + Publish a message to this pre configured exchange. The actual publication + is asynchronous and this method only schedules it to be done. Args: - message (dict): message to publish. Must be valid JSON dictionary. - routing_key (str, optional): routing_key to route the message to correct consumer. - Defaults to ''. - corr_id (str | None, optional): correlation_id to tag message. Defaults to None. + message (bytes): The message to be published + properties (BasicProperties): The props to be attached to message when published + """ + threadsafe_call(self.channel, lambda: self._publish(message, properties, [False])) + + def blocking_publish(self, message: bytes, properties: BasicProperties = None) -> bool: + """ + Blocking publish. Works by waiting for the completion of an asynchronous + publication. + + Args: + message (bytes): The message to be published + properties (BasicProperties): The props to be attached to message when published Returns: - bool: True if message was published to the queue + bool: Returns True if no errors ocurred during publication. If this + publisher is configured to confirm delivery will return False if + failed to confirm. """ + success_flag = [False] + done_event = Event() + threadsafe_call(self.channel, lambda: self._publish(message, + properties, + success_flag, + done_event)) + done_event.wait() + return success_flag[0] - properties = BasicProperties(content_type='application/json', - content_encoding='utf-8', - correlation_id=corr_id) + def stop(self): + """Cleanly end the running of a thread, free up resources""" + logger.info("Stopping publisher") + self._is_running = False + # Wait until all the data events have been processed + self.connection.process_data_events(time_limit=1) + if self.connection.is_open: + threadsafe_call(self.channel, + self.channel.close, + self.connection.close) + + def _publish( + self, + message: bytes, + properties: BasicProperties, + success_flag: list[bool], + done_event: Event = None + ): + """ + Core publish method. Success flag is passed by reference, and done event, if not None + can be used to block until message is actually publish, vs being scheduled to be. + + success_flag (list[bool]): This is effectively passing a boolean by reference. This + will change the value of the first element it this list + to indicate if the core publishing was successful. + done_event (Event): A Thread.Event that can be used to indicate when publishing is + complete in a different thread. This can be used to wait for the + completion via 'done_event.wait()' following calling this function. + """ + success_flag[0] = False try: - self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, - json.dumps(message, ensure_ascii=True), properties) - - return True - except AMQPConnectionError: # pylint: disable=broad-exception-caught - try: - self._connection, self._channel, self._queue_name = \ - _initialize_connection_and_channel(self._conn_params, self._rmq_params) - self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, - json.dumps(message, ensure_ascii=True), properties) - - return True - except AMQPConnectionError as exc: # pylint: disable=broad-exception-caught - logger.error('Publish message problem: [%s] %s', type(exc), str(exc)) - return False + self.channel.basic_publish(self._exch.name, + self._exch.route_key, + body=message, + properties=properties, + mandatory=self._exch.mandatory) + success_flag[0] = True + print('\n message published\n') + if self._queue and self._queue.name.startswith('_'): + try: + self.channel.queue_purge(queue=self._queue.name) + except ValueError as exe: + logger.warning('Exception when removing message from private queue: %s', exe) + except UnroutableError: + logger.warning('Message was not delivered') + except Exception as exe: + logger.warning('Message not published, cause: %s', exe) + raise exe + finally: + if done_event: + done_event.set() diff --git a/python/idsse_common/idsse/common/sci/bit_pack.py b/python/idsse_common/idsse/common/sci/bit_pack.py index 54d7f9ce..67ca8842 100644 --- a/python/idsse_common/idsse/common/sci/bit_pack.py +++ b/python/idsse_common/idsse/common/sci/bit_pack.py @@ -15,6 +15,7 @@ from typing import NamedTuple import numpy + class PackType(IntEnum): """Enumerated type used to indicate if data is packed into a byte or short (16 bit)""" BYTE = 8 @@ -35,7 +36,8 @@ class PackData(NamedTuple): offset: float data: list | numpy.ndarray -def get_min_max(data: list | numpy.ndarray) -> (float, float): + +def get_min_max(data: list | numpy.ndarray) -> tuple[float, float]: """Get the minimum and maximum from the numpy array or list. Testing showed (on a mac) that numpy was faster... @@ -45,7 +47,7 @@ def get_min_max(data: list | numpy.ndarray) -> (float, float): Raises: TypeError: If the arguments for the call to the function are invalid Returns: - (float, float): The minimum, maximum from the supplied argument + tuple[float, float]: The minimum, maximum from the supplied argument """ if isinstance(data, list): arr = numpy.array(data).ravel(order='K') @@ -53,6 +55,7 @@ def get_min_max(data: list | numpy.ndarray) -> (float, float): data.ravel(order='K') return numpy.min(data), numpy.max(data) + def get_pack_info( min_value: float, max_value: float, @@ -170,6 +173,7 @@ def pack_numpy_to_numpy( data = data-offset data /= scale + data += 0.5 # for non-negative values adding .5 before truncation is equivalent to round return PackData(pack_type, scale, offset, numpy.trunc(data, out=data)) @@ -270,6 +274,7 @@ def _pack_list_to_list_in_place( data = _pack_np_array_to_list(np_data, scale, offset) return data + # core packing code specific to packing a list(s) with forced copying def _pack_list_to_list_copy( data: list, @@ -289,12 +294,14 @@ def _pack_np_array_to_list( np_data = numpy.copy(data) np_data -= offset np_data /= scale + # for non-negative values, adding 0.5 before truncation is equivalent to rounding + np_data += 0.5 # Return the truncated array and a int list. return (numpy.trunc(np_data).astype(int)).tolist() # core packing code using diplib package (sometimes slower than the original so not used but here # for an option. -#def _diplib_pack(data: numpy.array, +# def _diplib_pack(data: numpy.array, # scale: float, # offset: float) -> numpy.array: # dip_data = dip.Image(data) diff --git a/python/idsse_common/idsse/common/sci/netcdf_io.py b/python/idsse_common/idsse/common/sci/netcdf_io.py index ffcf763f..df2436ea 100644 --- a/python/idsse_common/idsse/common/sci/netcdf_io.py +++ b/python/idsse_common/idsse/common/sci/netcdf_io.py @@ -56,7 +56,7 @@ def read_netcdf_global_attrs(filepath: str) -> dict: return _read_attrs(Dataset(filepath)) -def read_netcdf(filepath: str, use_h5_lib = False) -> tuple[dict, np.ndarray]: +def read_netcdf(filepath: str, use_h5_lib: bool = False) -> tuple[dict, np.ndarray]: """Reads DAS Netcdf file. Args: @@ -81,7 +81,10 @@ def read_netcdf(filepath: str, use_h5_lib = False) -> tuple[dict, np.ndarray]: return global_attrs, grid -def write_netcdf(attrs: dict, grid: np.ndarray, filepath: str, use_h5_lib = False) -> str: +def write_netcdf(attrs: dict, + grid: np.ndarray, + filepath: str, + use_h5_lib: bool = False) -> str: """Store data and attributes to a Netcdf4 file Args: diff --git a/python/idsse_common/test/sci/test_bit_pack.py b/python/idsse_common/test/sci/test_bit_pack.py index 4b445b5a..7450c1d4 100644 --- a/python/idsse_common/test/sci/test_bit_pack.py +++ b/python/idsse_common/test/sci/test_bit_pack.py @@ -22,6 +22,8 @@ pack_to_list, PackInfo, PackType) + + def test_get_min_max(): example = [-1.0, 0.0, 1.0, 2.0] expected = (-1.0, 2.0) @@ -50,46 +52,49 @@ def test_get_pack_info_with_pack_type(): with pytest.raises(ValueError): result = get_pack_info(-1, 1, decimals=2, pack_type=PackType.SHORT) + def test_pack_to_list(): data = [[10, 50, 100, 200, 500], [30, 150, 300, 400, 600]] result = pack_to_list(data, in_place=False) - expected = [[0, 4443, 9996, 21104, 54427], [2221, 15550, 32212, 43319, 65535]] + expected = [[0, 4443, 9997, 21104, 54427], [2222, 15551, 32212, 43320, 65535]] numpy.testing.assert_array_equal(result.data, expected) assert data[0][0] != result.data[0][0] data = numpy.array(data, dtype=float) result = pack_to_list(data, in_place=False) - expected = [[0, 4443, 9996, 21104, 54427], [2221, 15550, 32212, 43319, 65535]] + expected = [[0, 4443, 9997, 21104, 54427], [2222, 15551, 32212, 43320, 65535]] numpy.testing.assert_array_equal(result.data, expected) assert data[0][0] != result.data[0][0] with pytest.raises(KeyError): - result = pack_to_list((-1,1)) + result = pack_to_list((-1, 1)) + def test_pack_list_to_list(): data = [[10, 50, 100, 200, 500], [30, 150, 300, 400, 600]] result = pack_to_list(data, in_place=False) - expected = [[0, 4443, 9996, 21104, 54427], [2221, 15550, 32212, 43319, 65535]] + expected = [[0, 4443, 9997, 21104, 54427], [2222, 15551, 32212, 43320, 65535]] numpy.testing.assert_array_equal(result.data, expected) assert data[0][0] != result.data[0][0] result = pack_to_list(data, in_place=True) with pytest.raises(KeyError): - result = pack_to_list((-1,1)) + result = pack_to_list((-1, 1)) + def test_pack_numpy(): data = numpy.array([[-1, -.5, 0, .5, 1], [-1, -.25, 0, .25, 1]]) result = pack_numpy_to_numpy(data, in_place=False) - expected = numpy.array([[0, 16383, 32767, 49151, 65535], - [0, 24575, 32767, 40959, 65535]]) + expected = numpy.array([[0, 16384, 32768, 49151, 65535], + [0, 24576, 32768, 40959, 65535]]) numpy.testing.assert_array_equal(result.data, expected) assert data[0, 0] != result.data[0, 0] with pytest.raises(ValueError): - result = pack_numpy_to_numpy(numpy.array([0,1,2], dtype=int), in_place=True) + result = pack_numpy_to_numpy(numpy.array([0, 1, 2], dtype=int), in_place=True) with pytest.raises(ValueError): - result = pack_numpy_to_numpy((-1,1)) + result = pack_numpy_to_numpy((-1, 1)) def test_pack_numpy_in_place(): diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py deleted file mode 100644 index 02c00f64..00000000 --- a/python/idsse_common/test/test_publish_confirm.py +++ /dev/null @@ -1,312 +0,0 @@ -"""Test suite for publish_confirm.py""" -# ---------------------------------------------------------------------------------- -# Created on Wed Oct 18 2023 -# -# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) -# Copyright (c) 2023 Colorado State University. All rights reserved. (2) -# -# Contributors: -# Mackenzie Grimes (2) -# -# ---------------------------------------------------------------------------------- -# pylint: disable=missing-function-docstring,redefined-outer-name,invalid-name,protected-access -# pylint: disable=too-few-public-methods,unused-argument - -from collections.abc import Callable -from concurrent.futures import Future -from time import sleep -from typing import NamedTuple -from unittest.mock import Mock, PropertyMock - -from pytest import fixture, raises, MonkeyPatch - -from pika import SelectConnection -from pika.exceptions import AMQPConnectionError -from pika.spec import Basic, Channel -from idsse.common.publish_confirm import PublishConfirm -from idsse.common.rabbitmq_utils import Conn, Exch, Queue - -EXAMPLE_CONN = Conn('localhost', '/', 5672, 'guest', 'guest') -EXAMPLE_EXCH = Exch('pub.conf.test', 'topic') -EXAMPLE_QUEUE = Queue('pub.conf', '#', False, False, True) - - -class Method(NamedTuple): - """mock of pika.frame.Method data class""" - method: Basic.Ack | Basic.Nack - - -# fixtures -@fixture -def channel_state() -> dict: - """ - Track the simulated state of our mock Channel, so it can be initialized with - default values, read/written anywhere in a given test, and reset after each test finishes - """ - return { - 'is_open': False, - 'delivery_tag': 0, # track messages that have been mock "sent" over our channel - } - - -@fixture -def mock_channel(channel_state: dict) -> Mock: - # set up complex pytest Mock object to emulate Channel - mock_obj = Mock(name='MockChannel', spec=Channel) - channel = mock_obj.return_value - channel.__int__= Mock(return_value=0) - - channel.is_open = PropertyMock(side_effect=lambda: channel_state['is_open']) - channel.is_closed = PropertyMock(side_effect=lambda: not channel_state['is_open']) - channel.exchange_declare.side_effect = ( - lambda exchange, exchange_type, callback: callback('userdata') - ) - channel.queue_declare.side_effect = ( - lambda queue, durable, arguments, exclusive, auto_delete, callback: callback(None) - ) - channel.queue_bind.side_effect = ( - lambda queue, exchange, routing_key, callback: callback(None) - ) - - def mock_confirm_delivery(callback): - method = Method(Basic.Ack(channel_state['delivery_tag'])) - channel_state['delivery_tag'] += 1 # our Mock needs to track this message ID as "sent" - callback(method) # send new Ack message back to PublishConfirm - channel.confirm_delivery.side_effect = mock_confirm_delivery - - def mock_basic_publish(exchange, key, body, properties): - channel_state['delivery_tag'] += 1 - channel.basic_publish.side_effect = mock_basic_publish - - def mock_close(): - channel_state['is_open'] = False - channel.close.side_effect = mock_close - - def mock_init(on_open_callback): - channel_state['is_open'] = True - channel_state['delivery_tag'] = 0 - on_open_callback(channel) - mock_obj.side_effect = mock_init - - return mock_obj - - -@fixture -def conn_state() -> dict: - """ - Track the simulated state of our mock Connection, so it can be initialized with - default values, read/written anywhere in a given test, and reset after each test finishes - """ - return { - 'is_open': False, - # save Connection's open/close callbacks when Connection is initialized, to be invoked - # when PublishConfirm.start/stop is called. These defaults should never run; - # should be overwritten with real callbacks inside PublishConfirm._create_connection() - 'on_open': lambda: RuntimeError('on_open_callback not passed to SelectConnection()'), - 'on_close': lambda: RuntimeError('on_open_callback not passed to SelectConnection()'), - } - - -@fixture -def mock_connection( - monkeypatch: MonkeyPatch, - mock_channel: Mock, - conn_state: dict, - channel_state: dict -) -> Mock: - mock_obj = Mock(name="MockConnection", spec=SelectConnection) - - connection = mock_obj.return_value - connection.is_open = PropertyMock(side_effect=lambda: conn_state['is_open']) - connection.channel = mock_channel - - # pylint: disable=unnecessary-lambda - connection.ioloop.start.side_effect = lambda: conn_state['on_open']() - connection.ioloop.stop.side_effect = lambda: None - - def mock_close(): - conn_state['is_open'] = False - connection.close.side_effect = mock_close - - def mock_init(parameters, on_open_callback, on_open_error_callback, on_close_callback): - conn_state['is_open'] = True - conn_state['on_open'] = lambda: on_open_callback(connection) - conn_state['on_close'] = lambda: on_close_callback(connection, 'Closed by test') - return connection - - mock_obj.side_effect = mock_init - - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', mock_obj) - return mock_obj - - -@fixture -def publish_confirm(mock_connection: Mock) -> PublishConfirm: - return PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) - - -# tests -def test_publish_confirm_start_and_stop(publish_confirm: PublishConfirm): - publish_confirm.start() - - assert publish_confirm._connection and publish_confirm._connection.is_open - assert publish_confirm._channel and publish_confirm._channel.is_open - assert publish_confirm._records.acked == 1 # channel.confirm_delivery() sent our first message - - publish_confirm.stop() - - assert publish_confirm._connection is None or publish_confirm._connection.is_closed - assert publish_confirm._channel is None or publish_confirm._channel.is_closed - - -def test_delivery_confirmation_handles_nack( - publish_confirm: PublishConfirm, mock_connection: Mock, channel_state: dict -): - def mock_confirm_delivery(callback: Callable[[Method], None]): - method = Method(Basic.Nack(channel_state['delivery_tag'])) - channel_state['delivery_tag'] += 1 - callback(method) - - publish_confirm._records.deliveries[0] = 'Confirm.Select' - mock_connection.return_value.channel.return_value.confirm_delivery = mock_confirm_delivery - - publish_confirm.start() - assert publish_confirm._records.nacked == 1 - assert publish_confirm._records.acked == 0 - - -def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm): - # start() doesn't call its callback in time (at all), so timeout should expire - publish_confirm.start = Mock(side_effect=lambda is_ready: None) - - # run wait_for_channel which should timeout waiting for Future to resolve - channel_is_ready = publish_confirm._wait_for_channel_to_be_ready(timeout=0.3) - assert not channel_is_ready - publish_confirm.start.assert_called_once() - - # teardown by undoing our hacky mock - publish_confirm.start = PublishConfirm.start - - -def test_publish_message_success_without_calling_start(mock_connection: Mock): - pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) - example_message = {'data': [123]} - - assert pub_conf._connection is None and pub_conf._channel is None - success = pub_conf.publish_message(example_message) - - # connection & channel should have been initialized internally, so publish should have worked - assert success - assert pub_conf._channel is not None and pub_conf._channel.is_open - assert pub_conf._records.message_number == 1 - assert pub_conf._records.deliveries[1] == example_message - - -def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, mock_connection: Mock): - message_data = {'data': 123} - mock_connection.return_value.channel.return_value.basic_publish = Mock( - side_effect=AMQPConnectionError('ACCESS_REFUSED') - ) - - success = publish_confirm.publish_message(message_data) - - # publish should have returned failure and not recorded a message delivery - assert not success - assert publish_confirm._records.message_number == 0 - assert len(publish_confirm._records.deliveries) == 0 - - # teardown our ad-hoc mocking of PublishConfirm instance - publish_confirm.start = PublishConfirm.start - publish_confirm.stop() - - -def test_publish_failure_restarts_thread(publish_confirm: PublishConfirm, mock_connection: Mock): - message_data = {'data': 123} - - # fail the first publish, succeed without incident on the second - mock_connection.return_value.channel.return_value.basic_publish = Mock( - side_effect=[AMQPConnectionError('ACCESS_REFUSED'), None] - ) - - initial_thread_name = publish_confirm._thread.name - success = publish_confirm.publish_message(message_data) - assert success - assert mock_connection.return_value.channel.return_value.basic_publish.call_count == 2 - assert publish_confirm._thread.name != initial_thread_name # should have new Thread - - -def test_on_channel_closed(publish_confirm: PublishConfirm, mock_connection: Mock): - publish_confirm.start() - assert publish_confirm._channel.is_open - - channel = mock_connection.return_value.channel.return_value - publish_confirm._on_channel_closed(channel, 'ChannelClosedByClient') - - assert publish_confirm._channel is None - assert publish_confirm._connection.is_closed - - publish_confirm.stop() # teardown - - -def test_start_with_future(publish_confirm: PublishConfirm): - is_channel_ready = Future() - assert publish_confirm._channel is None - - # run test - publish_confirm.start(is_channel_ready) - assert is_channel_ready.result(timeout=2) - - # teardown - publish_confirm.stop() - - -def test_exchange_failure_raises_exception(monkeypatch: MonkeyPatch, mock_connection: Mock): - # set up mock Channel that will fail on RabbitMQ exchange declare step - mock_connection.return_value.channel.return_value.exchange_declare = Mock( - side_effect=ValueError('Precondition failed: exchange did not match') - ) - - pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) - - # run test - is_channel_ready = Future() - pub_conf.start(is_ready=is_channel_ready) - exc = is_channel_ready.exception() - assert isinstance(exc, ValueError) and 'Precondition failed' in str(exc.args[0]) - - -def test_start_without_callback_sleeps(publish_confirm: PublishConfirm, monkeypatch: MonkeyPatch): - def mock_sleep_function(secs: float): - # If this is not the call originating from PublishConfirm.start(), let it really sleep. - # Mocking all sleep() calls seemed to break Thread operations (unit test ran forever) - if secs != 0.2: - sleep(secs) - - mock_sleep = Mock(wraps=mock_sleep_function) - monkeypatch.setattr('idsse.common.publish_confirm.time.sleep', mock_sleep) - - # if no callback passed, start() should sleep internally to ensure RabbitMQ callbacks complete - publish_confirm.start() - - # mock sleep someimtes captures a call from PublishConfirm.run(), due to a race condition - # between this test's thread and the PublishConfirm thread. Both results are acceptable - sleep_call_args = [call.args for call in mock_sleep.call_args_list] - assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (0.1,)])] - - -def test_wait_for_channel_returns_when_ready(publish_confirm: PublishConfirm): - publish_confirm._connection = None - publish_confirm._channel = None - - is_ready = publish_confirm._wait_for_channel_to_be_ready() - assert is_ready - assert publish_confirm._channel is not None and publish_confirm._channel.is_open - - -def test_calling_start_twice_raises_error(mock_connection: Mock): - pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) - - pub_conf.start() - with raises(RuntimeError) as exc: - pub_conf.start() - assert exc is not None diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index e273c80e..867dbabd 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -16,17 +16,16 @@ from unittest.mock import Mock from pytest import fixture, raises, MonkeyPatch -from pika import BlockingConnection from pika.adapters import blocking_connection from idsse.common.rabbitmq_utils import ( - Conn, Exch, Queue, RabbitMqParams, PublisherSync, subscribe_to_queue + Conn, Exch, Queue, Publisher, RabbitMqParams, subscribe_to_queue ) # Example data objects CONN = Conn('localhost', '/', port=5672, username='user', password='password') RMQ_PARAMS = RabbitMqParams( - Exch('ims_data', 'topic', True), + Exch('ims_data', 'topic'), Queue('ims_data', '', True, False, True) ) @@ -85,7 +84,7 @@ def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock # assert correct (mocked) pika calls were made mock_blocking_connection.assert_called_once() - _connection.channel.assert_called_once() + _connection.channel.assert_called_once() # pylint: disable=no-member _channel.basic_qos.assert_called_once() _channel.basic_consume.assert_called_once() @@ -120,7 +119,6 @@ def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock ) - def test_private_queue_sets_ttl(monkeypatch: MonkeyPatch, mock_connection: Mock): mock_blocking_connection = Mock(return_value=mock_connection) monkeypatch.setattr( @@ -149,11 +147,15 @@ def test_private_queue_sets_ttl(monkeypatch: MonkeyPatch, mock_connection: Mock) ) -def test_passing_connection_does_not_create_new(mock_connection): - mock_connection.__class__ = BlockingConnection # set mock type to pika.BlockingConnection +def test_passing_connection_does_not_create_new(mock_connection, monkeypatch): mock_callback_function = Mock(name='on_message_callback') + mock_blocking_connection = Mock(return_value=mock_connection) + monkeypatch.setattr( + 'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection + ) + new_connection, new_channel = subscribe_to_queue( - mock_connection, RMQ_PARAMS, mock_callback_function + CONN, RMQ_PARAMS, mock_callback_function ) mock_connection.assert_not_called() @@ -172,19 +174,6 @@ def test_passing_unsupported_connection_type_fails(): assert exc is not None -def test_passing_channel_does_not_create_new(mock_connection: Mock, mock_channel: Mock): - mock_callback_func = Mock() - mock_connection.__class__ = BlockingConnection # make mock look like real BlockingConnection - - _, new_channel = subscribe_to_queue( - mock_connection, RMQ_PARAMS, mock_callback_func, channel=mock_channel - ) - - mock_connection.channel.assert_not_called() - new_channel.basic_consume.assert_called_once() - assert new_channel == mock_channel - - def test_direct_reply_does_not_declare_queue( monkeypatch: MonkeyPatch, mock_connection: Mock ): @@ -227,23 +216,24 @@ def test_default_exchange_does_not_declare_exchange( def test_simple_publisher(monkeypatch: MonkeyPatch, mock_connection: Mock): - # add mock to get Connnection callback to invoke immediately + # add mock to get Connection callback to invoke immediately mock_connection.add_callback_threadsafe = Mock(side_effect=lambda callback: callback()) mock_blocking_connection = Mock(return_value=mock_connection) monkeypatch.setattr( 'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection ) - publisher = PublisherSync(CONN, RMQ_PARAMS) + mock_threadsafe = Mock() + monkeypatch.setattr('idsse.common.rabbitmq_utils.threadsafe_call', mock_threadsafe) + + publisher = Publisher(CONN, RMQ_PARAMS.exchange) mock_blocking_connection.assert_called_once() _channel = mock_blocking_connection.return_value.channel _channel.assert_called_once() - assert publisher._connection == mock_connection + assert publisher.connection == mock_connection - result = publisher.publish_message({'data': 123}) - assert result - _channel.return_value.basic_publish.assert_called_once() + publisher.publish({'data': 123}) + assert 'Publisher.publish' in str(mock_threadsafe.call_args[0][1]) publisher.stop() - _channel.return_value.close.assert_called_once() - mock_blocking_connection.return_value.close.assert_called_once() + assert 'MockChannel.close' in str(mock_threadsafe.call_args[0][1])