diff --git a/fedora_messaging/exceptions.py b/fedora_messaging/exceptions.py index 708cb073..909637ce 100644 --- a/fedora_messaging/exceptions.py +++ b/fedora_messaging/exceptions.py @@ -5,6 +5,27 @@ class BaseException(Exception): """The base class for all exceptions raised by fedora_messaging.""" +class NoFreeChannels(BaseException): + """Raised when a connection has reached its channel limit""" + + +class BadDeclaration(BaseException): + """ + Raised when declaring an object in AMQP fails. + + Args: + obj_type (str): The type of object being declared. One of "binding", + "queue", or "exchange". + description (dict): The description of the object. + reason (str): The reason the server gave for rejecting the declaration. + """ + + def __init__(self, obj_type, description, reason): + self.obj_type = obj_type + self.description = description + self.reason = reason + + class ConfigurationException(BaseException): """ Raised when there's an invalid configuration setting @@ -69,10 +90,6 @@ class Drop(ConsumeException): """ -class BadConsumer(ConsumeException): - """Raised when the consumer specified does not exist.""" - - class HaltConsumer(ConsumeException): """ Consumer callbacks should raise this exception if they wish the consumer to diff --git a/fedora_messaging/tests/unit/twisted/test_protocol.py b/fedora_messaging/tests/unit/twisted/test_protocol.py index 29401d5e..e182d08e 100644 --- a/fedora_messaging/tests/unit/twisted/test_protocol.py +++ b/fedora_messaging/tests/unit/twisted/test_protocol.py @@ -28,8 +28,8 @@ from fedora_messaging import config from fedora_messaging.message import Message -from fedora_messaging.exceptions import Nack, Drop, HaltConsumer, BadConsumer -from fedora_messaging.twisted.protocol import FedoraMessagingProtocol, _pika_version +from fedora_messaging.exceptions import Nack, Drop, HaltConsumer +from fedora_messaging.twisted.protocol import FedoraMessagingProtocol, _pika_version, Consumer try: @@ -95,28 +95,13 @@ def setUp(self): ) def test_consume_not_running(self): """Assert consume returns a consumer and doesn't register it when not running.""" - expected = {"tag": "tag1", "queue": "my_queue", "callback": lambda x: x} + expected = Consumer("tag1", "my_queue", lambda x: x, self.protocol._channel) def _check(actual): self.assertEqual(actual, expected) self.assertEqual(0, self.protocol._channel.basic_consume.call_count) - d = self.protocol.consume(expected["callback"], "my_queue") - d.addCallback(_check) - - return pytest_twisted.blockon(d) - - @mock.patch( - "fedora_messaging.twisted.protocol.uuid.uuid4", mock.Mock(return_value="tag1") - ) - def test_consume_consumer_copy(self): - """Assert we don't hand out references to the consumer dictionaries.""" - - def _check(actual): - actual["tag"] = "not_my_tag" - self.assertEqual(self.protocol._consumers["my_queue"]["tag"], "tag1") - - d = self.protocol.consume(lambda _: _, "my_queue") + d = self.protocol.consume(expected.callback, "my_queue") d.addCallback(_check) return pytest_twisted.blockon(d) @@ -132,7 +117,7 @@ def cb2(): def _check(_): self.assertEqual(1, len(self.protocol._consumers)) - self.assertEqual(cb2, self.protocol._consumers["my_queue"]["callback"]) + self.assertEqual(cb2, self.protocol._consumers["my_queue"].callback) d = self.protocol.consume(cb1, "my_queue") d.addCallback(lambda _: self.protocol.consume(cb2, "my_queue")) @@ -145,7 +130,7 @@ def _check(_): ) def test_consume_running(self): """Assert when running, consume sets up the AMQP consumer""" - expected = {"tag": "tag1", "queue": "my_queue", "callback": lambda x: x} + expected = Consumer("tag1", "my_queue", lambda x: x, self.protocol._channel) self.protocol._running = True def _check_channel(_): @@ -153,7 +138,7 @@ def _check_channel(_): queue="my_queue", consumer_tag="tag1" ) - d = self.protocol.consume(expected["callback"], "my_queue") + d = self.protocol.consume(expected.callback, expected.queue) d.addCallback(lambda actual: self.assertEqual(actual, expected)) d.addCallback(_check_channel) @@ -172,37 +157,12 @@ def _check(_): return pytest_twisted.blockon(d) - @mock.patch( - "fedora_messaging.twisted.protocol.uuid.uuid4", mock.Mock(return_value="tag1") - ) def test_cancel_running(self): - """Assert when running, cancel tries to cancel the AMQP consumer.""" - self.protocol._running = True - - def _check(_): - self.assertEqual(0, len(self.protocol._consumers)) - self.protocol._channel.basic_cancel.assert_called_once_with( - consumer_tag="tag1" - ) - - d = self.protocol.consume(lambda _: _, "my_queue") - d.addCallback(self.protocol.cancel) - d.addCallback(_check) - - return pytest_twisted.blockon(d) - - @mock.patch( - "fedora_messaging.twisted.protocol.uuid.uuid4", mock.Mock(return_value="tag1") - ) - def test_cancel_running_failed(self): - """Assert when running and cancel results in a channel error, it's ignored.""" + """Assert when not running, cancel doesn't try to cancel the AMQP consumer.""" self.protocol._running = True - self.protocol._channel.basic_cancel.side_effect = ( - pika.exceptions.AMQPChannelError - ) def _check(_): - self.assertEqual(0, len(self.protocol._consumers)) + self.assertEqual(1, len(self.protocol._consumers)) self.protocol._channel.basic_cancel.assert_called_once_with( consumer_tag="tag1" ) @@ -213,18 +173,6 @@ def _check(_): return pytest_twisted.blockon(d) - def test_cancel_bad_consumer(self): - """Assert when not running, cancel doesn't try to cancel the AMQP consumer.""" - d = defer.maybeDeferred(self.protocol.cancel, {"queue": "invalid"}) - - def _check_err(failure): - failure.trap(BadConsumer) - - d.addCallback(self.fail) - d.addErrback(_check_err) - - return pytest_twisted.blockon(d) - def test_connection_ready(self): # Check the ready Deferred. def _check(_): @@ -232,6 +180,7 @@ def _check(_): self.protocol._channel.basic_qos.assert_called_with( prefetch_count=config.conf["qos"]["prefetch_count"], prefetch_size=config.conf["qos"]["prefetch_size"], + all_channels=True ) if _pika_version >= pkg_resources.parse_version("1.0.0b1"): self.protocol._channel.confirm_delivery.assert_called() @@ -246,153 +195,6 @@ def _check(_): return pytest_twisted.blockon(d) - def test_setup_read(self): - # Check the setupRead method. - self.factory.bindings = [ - { - "exchange": "testexchange1", - "exchange_type": "topic", - "queue_name": "testqueue1", - "routing_key": "#", - }, - { - "exchange": "testexchange2", - "exchange_type": "topic", - "queue_name": "testqueue2", - "queue_auto_delete": True, - "routing_key": "testrk", - }, - { - "exchange": "testexchange3", - "exchange_type": "headers", - "queue_name": "testqueue3", - "routing_key": "#", - "queue_arguments": {}, - "binding_arguments": {"x-match": "all"}, - }, - ] - callback = mock.Mock() - d = self.protocol.setupRead(callback) - - def _check(_): - for consumer in self.protocol._consumers.values(): - self.assertTrue(consumer["callback"] is callback) - - for queue in ["testqueue1", "testqueue2", "testqueue3"]: - self.assertIn(queue, self.protocol._consumers) - self.assertEqual( - self.protocol._channel.exchange_declare.call_args_list, - [ - ( - (), - dict( - exchange="testexchange1", - exchange_type="topic", - durable=True, - ), - ), - ( - (), - dict( - exchange="testexchange2", - exchange_type="topic", - durable=True, - ), - ), - ( - (), - dict( - exchange="testexchange3", - exchange_type="headers", - durable=True, - ), - ), - ], - ) - self.assertEqual( - self.protocol._channel.queue_declare.call_args_list, - [ - ( - (), - dict( - queue="testqueue1", - arguments=None, - auto_delete=False, - durable=True, - ), - ), - ( - (), - dict( - queue="testqueue2", - arguments=None, - auto_delete=True, - durable=True, - ), - ), - ( - (), - dict( - queue="testqueue3", - arguments={}, - auto_delete=False, - durable=True, - ), - ), - ], - ) - self.assertEqual( - self.protocol._channel.queue_bind.call_args_list, - [ - ( - (), - dict( - exchange="testexchange1", - queue="testqueue1", - routing_key="#", - arguments=None, - ), - ), - ( - (), - dict( - exchange="testexchange2", - queue="testqueue2", - routing_key="testrk", - arguments=None, - ), - ), - ( - (), - dict( - exchange="testexchange3", - queue="testqueue3", - routing_key="#", - arguments={"x-match": "all"}, - ), - ), - ], - ) - - d.addCallback(_check) - return pytest_twisted.blockon(d) - - def test_setupRead_no_bindings(self): - # The setupRead method should do nothing when there are no bindings in - # the factory. - self.factory.bindings = [] - callback = mock.Mock() - d = self.protocol.setupRead(callback) - - def _check(_): - self.assertEqual(self.protocol._consumers, {}) - self.protocol._channel.exchange_declare.assert_not_called() - self.protocol._channel.queue_declare.assert_not_called() - self.protocol._channel.queue_bind.assert_not_called() - - d.addCallback(_check) - return pytest_twisted.blockon(d) - def test_publish(self): # Check the publish method. body = {"bodykey": "bodyvalue"} @@ -420,23 +222,25 @@ def test_resumeProducing(self): self.protocol._running = False callback = mock.Mock() self.protocol._consumers = { - "testqueue1": { - "tag": "consumer1", - "callback": callback, - "queue": "testqueue1", - }, - "testqueue2": { - "tag": "consumer2", - "callback": callback, - "queue": "testqueue2", - }, - "testqueue3": { - "tag": "consumer3", - "callback": callback, - "queue": "testqueue3", - }, + "testqueue1": Consumer( + "consumer1", + "testqueue1", + callback, + self.protocol._channel, + ), + "testqueue2": Consumer( + "consumer2", + "testqueue2", + callback, + self.protocol._channel, + ), + "testqueue3": Consumer( + "consumer3", + "testqueue3", + callback, + self.protocol._channel, + ), } - self.protocol._queues = set(["testqueue1", "testqueue2", "testqueue3"]) self.protocol._read = mock.Mock(side_effect=lambda _, __: defer.succeed(None)) d = self.protocol.resumeProducing() @@ -532,12 +336,13 @@ def setUp(self): self.protocol._impl.is_closed = False self.protocol._on_message = mock.Mock() self.queue = mock.Mock() + self.consumer = Consumer("consumer1", "my_queue_name", lambda _: _, mock.Mock()) def test_read_not_running(self): # When not running, _read() should do nothing. self.queue.get.side_effect = lambda: defer.succeed(None) self.protocol._running = False - d = self.protocol._read(self.queue, {}) + d = self.protocol._read(self.queue, self.consumer) def _check(_): self.queue.get.assert_not_called() @@ -558,13 +363,13 @@ def _check(_): self.assertEqual( self.protocol._on_message.call_args_list, [ - (("df1", "prop1", "body1", {}), {}), - (("df2", "prop2", "body2", {}), {}), - (("df3", "prop3", "body3", {}), {}), + (("df1", "prop1", "body1", self.consumer), {}), + (("df2", "prop2", "body2", self.consumer), {}), + (("df3", "prop3", "body3", self.consumer), {}), ], ) - d = self.protocol._read(self.queue, {}) + d = self.protocol._read(self.queue, self.consumer) d.addCallback(_check) return pytest_twisted.blockon(d) @@ -574,7 +379,7 @@ def test_read_no_body(self): defer.succeed((None, None, None, None)), defer.succeed((None, None, None, "")), ] - d = self.protocol._read(self.queue, {}) + d = self.protocol._read(self.queue, self.consumer) def _check(_): self.protocol._on_message.assert_not_called() @@ -597,7 +402,7 @@ def test_read_exit_loop(self): for exc in exceptions: queue = mock.Mock() queue.get.side_effect = exc - deferreds.append(self.protocol._read(queue, {})) + deferreds.append(self.protocol._read(queue, self.consumer)) def _check(_): self.protocol._on_message.assert_not_called() @@ -614,11 +419,8 @@ def setUp(self): self.protocol = MockProtocol(None) self._message_callback = mock.Mock() self.protocol._impl.is_closed = False - self.protocol._consumers["my_queue_name"] = { - "tag": "consumer1", - "callback": self._message_callback, - "queue": "my_queue_name", - } + self.protocol._consumers["my_queue_name"] = Consumer( + "consumer1", "my_queue_name", self._message_callback, mock.Mock()) def _call_on_message(self, topic, headers, body): """Prepare arguments for the _on_message() method and call it.""" diff --git a/fedora_messaging/twisted/factory.py b/fedora_messaging/twisted/factory.py index cd770497..d33e4ed3 100644 --- a/fedora_messaging/twisted/factory.py +++ b/fedora_messaging/twisted/factory.py @@ -40,16 +40,46 @@ class FedoraMessagingFactory(protocol.ReconnectingClientFactory): name = u"FedoraMessaging:Factory" protocol = FedoraMessagingProtocol - def __init__(self, parameters, bindings): - """Initialize the protocol. + def __init__( + self, + parameters, + confirms=True, + exchanges=None, + queues=None, + bindings=None, + consumers=None, + ): + """ + Create a new factory for protocol objects. + + Any exchanges, queues, bindings, or consumers provided here will be + declared and set up each time a new protocol instance is created. In + other words, each time a new connection is set up to the broker, it + will start with the declaration of these objects. Args: parameters (pika.ConnectionParameters): The connection parameters. - bindings (list of dict): which bindings to setup on connect. + confirms (bool): If true, attempt to turn on publish confirms extension. + exchanges (list of dicts): List of exchanges to declare. Each dictionary is + passed to :meth:`pika.channel.Channel.exchange_declare` as keyword arguments, + so any parameter to that method is a valid key. + queues (list of dicts): List of queues to declare each dictionary is + passed to :meth:`pika.channel.Channel.queue_declare` as keyword arguments, + so any parameter to that method is a valid key. + bindings (list of dicts): A list of bindings to be created between + queues and exchanges. Each dictionary is passed to + :meth:`pika.channel.Channel.queue_bind`. The "queue" and "exchange" keys + are required. + consumers (dict): Each key should be a queue name, each value should + be a callable that accepts a message as an argument, to be invoked + when a new message is delivered. """ - self.bindings = bindings self._parameters = parameters - self._message_callback = None + self.confirms = confirms + self.exchanges = exchanges or [] + self.queues = queues or [] + self.bindings = bindings or [] + self.consumers = consumers or {} self.client = None self._client_ready = defer.Deferred() @@ -81,10 +111,19 @@ def buildProtocol(self, addr): @defer.inlineCallbacks def _on_client_ready(self): """Called when the client is ready to send and receive messages.""" - # Setup read (on connect and reconnect). - if self._message_callback is not None: - yield self.client.setupRead(self._message_callback) - yield self.client.resumeProducing() + yield self.client.resumeProducing() + + for exchange in self.exchanges: + yield self.client.exchange_declare(**exchange) + + for queue in self.queues: + yield self.client.queue_declare(**queue) + + for binding in self.bindings: + yield self.client.queue_bind(**binding) + + for queue, callback in self.consumers.items(): + yield self.client.consume(queue, callback) # Run ready callbacks. self._client_ready.callback(None) @@ -146,22 +185,41 @@ def stopFactory(self): yield self.client.stopProducing() protocol.ReconnectingClientFactory.stopFactory(self) - @defer.inlineCallbacks - def consume(self, message_callback): - """Pass incoming messages to the provided callback. + def consume(self, callback, queue): + """ + Register a new consumer. + + This consumer will be configured for every protocol this factory + produces so it will be reconfigured on network failures. If a connection + is already active, the consumer will be added to it. Args: - message_callback (callable): The callable to pass the message to - when one arrives. + callback (callable): The callback to invoke when a message arrives. + queue (str): The name of the queue to consume from. """ - log.msg("Setup messages consumption.", system=self.name, logLevel=logging.DEBUG) - new_setup = self._message_callback is None - self._message_callback = message_callback - if self._client_ready.called and new_setup: - # If consume() is called after the client is ready (and we did - # not setup before), do it now. - yield self.client.setupRead(self._message_callback) - yield self.client.resumeProducing() + self.consumers[queue] = callback + if self._client_ready.called: + return self.client.consume(callback, queue) + + def cancel(self, queue): + """ + Cancel the consumer for a queue. + + This removes the consumer from the list of consumers to be configured for + every connection. + + Args: + queue (str): The name of the queue the consumer is subscribed to. + Returns: + defer.Deferred or None: Either a Deferred that fires when the consumer + is canceled, or None if the consumer was already canceled. Wrap + the call in :func:`defer.maybeDeferred` to always receive a Deferred. + """ + try: + del self.consumers[queue] + except KeyError: + pass + return self.client.cancel(queue) @defer.inlineCallbacks def publish(self, message, exchange=None): @@ -183,20 +241,13 @@ def publish(self, message, exchange=None): yield self._client_ready try: yield self.client.publish(message, exchange) - except (pika.exceptions.ConnectionClosed, pika.exceptions.ChannelClosed) as e: + except ConnectionException as e: log.msg( "Connection lost while publishing, retrying.", system=self.name, logLevel=logging.WARNING, ) yield self.publish(message, exchange) - except (pika.exceptions.NackError, pika.exceptions.UnroutableError) as e: - log.msg( - "Message was rejected by the broker ({})".format(e), - system=self.name, - logLevel=logging.WARNING, - ) - raise PublishReturned(reason=e) except pika.exceptions.AMQPError as e: self.stopTrying() yield self.client.close() diff --git a/fedora_messaging/twisted/protocol.py b/fedora_messaging/twisted/protocol.py index 50d7cf21..29aa23bc 100644 --- a/fedora_messaging/twisted/protocol.py +++ b/fedora_messaging/twisted/protocol.py @@ -23,7 +23,7 @@ from __future__ import absolute_import -import logging +from collections import namedtuple import uuid import pika @@ -35,7 +35,16 @@ from .. import config from ..message import get_message -from ..exceptions import Nack, Drop, HaltConsumer, ValidationError, BadConsumer +from ..exceptions import ( + Nack, + Drop, + HaltConsumer, + ValidationError, + NoFreeChannels, + BadDeclaration, + PublishReturned, + ConnectionException +) _log = Logger(__name__) @@ -47,6 +56,9 @@ ChannelClosedByClient = pika.exceptions.ChannelClosedByClient +Consumer = namedtuple("Consumer", ["tag", "queue", "callback", "channel"]) + + class FedoraMessagingProtocol(TwistedProtocolConnection): """A Twisted Protocol for the Fedora Messaging system. @@ -68,75 +80,50 @@ def __init__(self, parameters, confirms=True): TwistedProtocolConnection.__init__(self, parameters) self._parameters = parameters if confirms and _pika_version < pkg_resources.parse_version("1.0.0b1"): - log.msg( - "Message confirmation is only available with pika 1.0.0+", - system=self.name, - logLevel=logging.ERROR, - ) + _log.error("Message confirmation is only available with pika 1.0.0+") confirms = False self._confirms = confirms self._channel = None self._running = False - self._message_callback = None # Map queue names to dictionaries representing consumers self._consumers = {} self.factory = None + @defer.inlineCallbacks + def _allocate_channel(self): + """ + Allocate a new AMQP channel. + + Raises: + pika.exceptions.NoFreeChannels: If this connection has reached its + maximum number of channels. + """ + try: + channel = yield self.channel() + except pika.exceptions.NoFreeChannels: + raise NoFreeChannels() + _log.info("Created AMQP channel {num}", num=channel.channel_number) + if self._confirms: + yield channel.confirm_delivery() + defer.returnValue(channel) + @defer.inlineCallbacks def connectionReady(self, res=None): """Called when the AMQP connection is ready. """ # The optional `res` argument is for compatibility with pika < 1.0.0 # Create channel - self._channel = yield self.channel() - log.msg("AMQP channel created", system=self.name, logLevel=logging.DEBUG) + self._channel = yield self._allocate_channel() yield self._channel.basic_qos( prefetch_count=config.conf["qos"]["prefetch_count"], prefetch_size=config.conf["qos"]["prefetch_size"], + all_channels=True, ) if self._confirms: yield self._channel.confirm_delivery() if _pika_version < pkg_resources.parse_version("1.0.0b1"): TwistedProtocolConnection.connectionReady(self, res) - @defer.inlineCallbacks - def setupRead(self, message_callback): - """Pass incoming messages to the provided callback. - - Args: - message_callback (callable): The callable to pass the message to - when one arrives. - """ - if not self.factory.bindings: - return - self._message_callback = message_callback - for binding in self.factory.bindings: - yield self._channel.exchange_declare( - exchange=binding["exchange"], - exchange_type=binding["exchange_type"], - durable=True, - ) - result = yield self._channel.queue_declare( - queue=binding["queue_name"], - durable=True, - auto_delete=binding.get("queue_auto_delete", False), - arguments=binding.get("queue_arguments"), - ) - queue_name = result.method.queue - yield self._channel.queue_bind( - queue=queue_name, - exchange=binding["exchange"], - routing_key=binding["routing_key"], - arguments=binding.get("binding_arguments"), - ) - consumer = { - "tag": str(uuid.uuid4()), - "queue": queue_name, - "callback": message_callback, - } - self._consumers[queue_name] = consumer - log.msg("AMQP bindings declared", system=self.name, logLevel=logging.DEBUG) - @defer.inlineCallbacks def _read(self, queue_object, consumer): """ @@ -213,7 +200,7 @@ def _on_message(self, delivery_frame, properties, body, consumer): ) try: message = get_message(delivery_frame.routing_key, properties, body) - message.queue = consumer["queue"] + message.queue = consumer.queue except ValidationError: _log.warn( "Message id {msgid} did not pass validation; ignoring message", @@ -230,7 +217,7 @@ def _on_message(self, delivery_frame, properties, body, consumer): topic=message.topic, msgid=properties.message_id, ) - yield defer.maybeDeferred(consumer["callback"], message) + yield defer.maybeDeferred(consumer.callback, message) except Nack: _log.warn( "Returning message id {msgid} to the queue", msgid=properties.message_id @@ -267,15 +254,30 @@ def publish(self, message, exchange): message (message.Message): The message to publish. exchange (str): The name of the AMQP exchange to publish to + Raises: + NoFreeChannels: If there are no available channels on this connection. + If this occurs, you can either reduce the number of consumers on this + connection or create an additional connection. + .. _exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges """ message.validate() - yield self._channel.basic_publish( - exchange=exchange, - routing_key=message._encoded_routing_key, - body=message._encoded_body, - properties=message._properties, - ) + try: + yield self._channel.basic_publish( + exchange=exchange, + routing_key=message._encoded_routing_key, + body=message._encoded_body, + properties=message._properties, + ) + except (pika.exceptions.NackError, pika.exceptions.UnroutableError) as e: + _log.error("Message was rejected by the broker ({reason})", + reason=str(e)) + raise PublishReturned(reason=e) + except pika.exceptions.ChannelClosed as e: + self._channel = yield self._allocate_channel() + yield self.publish(message, exchange) + except pika.exceptions.ConnectionClosed as e: + raise ConnectionException(reason=e) @defer.inlineCallbacks def consume(self, callback, queue): @@ -287,59 +289,179 @@ def consume(self, callback, queue): already exists for the given queue, the callback is simply updated and any new messages for that consumer use the new callback. - It's safe to call this before calling :meth:`resumeProducing`. Any - consumer added with this API will be started when it is called. + If :meth:`resumeProducing` has not been called when this method is called, + it will be called for you. Returns: - dict: A dictionary that identifies this consumer. This can be - passed to :meth:`cancel` to halt this consumer. It should not - be modified. + Consumer: A namedtuple that identifies this consumer. """ - if queue in self._consumers: - self._consumers[queue]["callback"] = callback - defer.returnValue(self._consumers[queue]) + if queue in self._consumers and self._consumers[queue].channel.is_open: + consumer = Consumer( + tag=self._consumers[queue].tag, + queue=queue, + callback=callback, + channel=self._consumers[queue].channel, + ) + self._consumers[queue] = consumer + defer.returnValue(consumer) + + consumer = Consumer( + tag=str(uuid.uuid4()), queue=queue, callback=callback, channel=self._channel + ) + + if not self._running: + self._consumers[queue] = consumer + yield self.resumeProducing() + defer.returnValue(consumer) - consumer = {"tag": str(uuid.uuid4()), "queue": queue, "callback": callback} self._consumers[queue] = consumer - if self._running: - queue_object, _ = yield self._channel.basic_consume( - queue=consumer["queue"], consumer_tag=consumer["tag"] - ) - deferred = self._read(queue_object, consumer) - deferred.addErrback(log.err, system=self.name) - log.msg( - "Successfully registered AMQP consumer", - system=self.name, - logLevel=logging.DEBUG, - q=queue, + queue_object, _ = yield consumer.channel.basic_consume( + queue=consumer.queue, consumer_tag=consumer.tag + ) + deferred = self._read(queue_object, consumer) + deferred.addErrback( + lambda f: _log.failure, "_read failed on consumer {c}", c=consumer ) - defer.returnValue(consumer.copy()) + _log.info("Successfully registered AMQP consumer {c}", c=consumer) + defer.returnValue(consumer) @defer.inlineCallbacks - def cancel(self, consumer): + def cancel(self, queue): """ - Cancel an existing consumer. + Cancel the consumer for a queue. Args: - consumer (dict): The dictionary from :meth:`consume` that - identifies the consumer to cancel. + queue (str): The name of the queue the consumer is subscribed to. Returns: defer.Deferred or None: Either a Deferred that fires when the consumer is canceled, or None if the consumer was already canceled. Wrap the call in :func:`defer.maybeDeferred` to always receive a Deferred. """ - if self._running: - try: - yield self._channel.basic_cancel(consumer_tag=consumer["tag"]) - except pika.exceptions.AMQPChannelError: - # Consumers are tied to channels, so if this channel is dead the - # consumer should already be canceled (and we can't get to it anyway) - pass try: - del self._consumers[consumer["queue"]] + consumer = self._consumers[queue] + yield consumer.channel.basic_cancel(consumer_tag=consumer.tag) + except pika.exceptions.AMQPChannelError: + # Consumers are tied to channels, so if this channel is dead the + # consumer should already be canceled (and we can't get to it anyway) + pass except KeyError: - raise BadConsumer() + return + + try: + yield consumer.channel.close() + except pika.exceptions.AMQPChannelError: + pass + + del self._consumers[queue] + + @defer.inlineCallbacks + def declare_exchanges(self, exchanges): + """ + Declare a number of exchanges at once. + + This simply wraps the :meth:`pika.channel.Channel.exchange_declare` + method and deals with error handling and channel allocation. + + Args: + exchanges (list of dict): A list of dictionaries, where each dictionary + represents an exchange. Each dictionary can have the following keys:: + exchange (str): The exchange's name + exchange_type (str): The type of the exchange ("direct", "topic", etc) + passive (bool): If true, this will just assert that the exchange exists, + but won't create it if it doesn't. + durable (bool): Whether or not the exchange is durable + arguments (dict): Extra arguments for the exchange's creation. + Raises: + NoFreeChannels: If there are no available channels on this connection. + If this occurs, you can either reduce the number of consumers on this + connection or create an additional connection. + BadDeclaration: If an exchange could not be declared. This can occur + if the exchange already exists, but does its type does not match + (e.g. it is declared as a "topic" exchange, but exists as a "direct" + exchange). It can also occur if it does not exist, but the current + user does not have permissions to create the object. + """ + channel = yield self._allocate_channel() + try: + for exchange in exchanges: + try: + yield channel.exchange_declare(**exchange) + except pika.exceptions.ChannelClosedByBroker as e: + raise BadDeclaration("exchange", exchange, str(e)) + finally: + channel.close() + + @defer.inlineCallbacks + def declare_queues(self, queues): + """ + Declare a list of queues. + + Args: + queues (list of dict): A list of dictionaries, where each dictionary + represents an exchange. Each dictionary can have the following keys:: + queue (str): The name of the queue + passive (bool): If true, this will just assert that the queue exists, + but won't create it if it doesn't. + durable (bool): Whether or not the queue is durable + exclusive (bool): Whether or not the queue is exclusive to this + connection. + auto_delete (bool): Whether or not the queue should be automatically + deleted once this connection ends. + arguments (dict): Additional arguments for the creation of the queue. + Raises: + NoFreeChannels: If there are no available channels on this connection. + If this occurs, you can either reduce the number of consumers on this + connection or create an additional connection. + BadDeclaration: If a queue could not be declared. This can occur + if the queue already exists, but does its type does not match + (e.g. it is declared as a durable queue, but exists as a non-durable + queue). It can also occur if it does not exist, but the current + user does not have permissions to create the object. + """ + channel = yield self._allocate_channel() + try: + for queue in queues: + try: + yield channel.queue_declare(**queue) + except pika.exceptions.ChannelClosedByBroker as e: + raise BadDeclaration("queue", queue, str(e)) + finally: + channel.close() + + @defer.inlineCallbacks + def bind_queues(self, bindings): + """ + Declare a set of bindings between queues and exchanges. + + Args: + bindings (list of dict): A list of binding definitions. Each dictionary + must contain the "queue" key whose value is the name of the queue + to create the binding on, as well as the "exchange" key whose value + should be the name of the exchange to bind to. Additional acceptable + keys are any keyword arguments accepted by + :meth:`pika.channel.Channel.queue_bind`. + + Raises: + NoFreeChannels: If there are no available channels on this connection. + If this occurs, you can either reduce the number of consumers on this + connection or create an additional connection. + BadDeclaration: If a binding could not be declared. This can occur if the + queue or exchange don't exist, or if they do, but the current user does + not have permissions to create bindings. + """ + channel = yield self._allocate_channel() + try: + for binding in bindings: + binding = binding.copy() + try: + queue = binding.pop("queue") + exchange = binding.pop("exchange") + yield channel.queue_declare(queue, exchange, **binding) + except pika.exceptions.ChannelClosedByBroker as e: + raise BadDeclaration("binding", queue, str(e)) + finally: + channel.close() @defer.inlineCallbacks def resumeProducing(self): @@ -355,8 +477,8 @@ def resumeProducing(self): # Start consuming self._running = True for consumer in self._consumers.values(): - queue_object, _ = yield self._channel.basic_consume( - queue=consumer["queue"], consumer_tag=consumer["tag"] + queue_object, _ = yield consumer.channel.basic_consume( + queue=consumer.queue, consumer_tag=consumer.tag ) deferred = self._read(queue_object, consumer) deferred.addErrback(