Skip to content

Commit

Permalink
Move the protocol module to the new Twisted logging API
Browse files Browse the repository at this point in the history
Since we require Twisted > 15, we can use the new logging API

Signed-off-by: Jeremy Cline <jcline@redhat.com>
  • Loading branch information
jeremycline committed Sep 19, 2018
1 parent 1aa678f commit 2781105
Showing 1 changed file with 44 additions and 63 deletions.
107 changes: 44 additions & 63 deletions fedora_messaging/twisted/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
from pika.adapters.twisted_connection import TwistedProtocolConnection
from twisted.internet import defer, error

# twisted.logger is available with Twisted 15+
from twisted.python import log
from twisted.logger import Logger

from .. import config
from ..message import get_message
from ..exceptions import Nack, Drop, HaltConsumer, ValidationError, BadConsumer


_log = Logger(__name__)

_pika_version = pkg_resources.get_distribution("pika").parsed_version
if _pika_version < pkg_resources.parse_version("1.0.0b1"):
ChannelClosedByClient = pika.exceptions.ChannelClosed
Expand Down Expand Up @@ -152,24 +153,29 @@ def _read(self, queue_object, consumer):
_channel, delivery_frame, properties, body = yield queue_object.get()
except (error.ConnectionDone, ChannelClosedByClient):
# This is deliberate.
log.msg(
"Closing the read loop on the producer.",
system=self.name,
logLevel=logging.DEBUG,
)
_log.info("Stopping the AMQP consumer with tag {tag}", tag=consumer.tag)
break
except pika.exceptions.ChannelClosed as e:
log.err(e, system=self.name)
_log.error(
"Stopping AMQP consumer {tag} for queue {q}: {e}",
tag=consumer.tag,
q=consumer.queue,
e=str(e),
)
break
except pika.exceptions.ConsumerCancelled:
log.msg("Consumer cancelled, quitting the read loop.", system=self.name)
except pika.exceptions.ConsumerCancelled as e:
_log.error(
"The AMQP broker canceled consumer {tag} on queue {q}: {e}",
tag=consumer.tag,
q=consumer.queue,
e=str(e),
)
break
except Exception as e:
log.err(
"Failed getting the next message in the queue, " "stopping.",
system=self.name,
_log.failure(
"An unexpected error occurred consuming from queue {q}",
q=consumer.queue,
)
log.err(e, system=self.name)
break
if body:
yield self._on_message(delivery_frame, properties, body, consumer)
Expand Down Expand Up @@ -200,71 +206,52 @@ def _on_message(self, delivery_frame, properties, body, consumer):
Returns:
Deferred: fired when the message has been handled.
"""
log.msg(
"Message arrived with delivery tag {tag}".format(
tag=delivery_frame.delivery_tag
),
system=self.name,
logLevel=logging.DEBUG,
_log.debug(
"Message arrived with delivery tag {tag} for {consumer}",
tag=delivery_frame.delivery_tag,
consumer=consumer,
)
try:
message = get_message(delivery_frame.routing_key, properties, body)
message.queue = consumer["queue"]
except ValidationError:
log.msg(
"Message id {msgid} did not pass validation.".format(
msgid=properties.message_id
),
system=self.name,
logLevel=logging.WARNING,
_log.warn(
"Message id {msgid} did not pass validation; ignoring message",
msgid=properties.message_id,
)
yield self._channel.basic_nack(
delivery_tag=delivery_frame.delivery_tag, requeue=False
)
return

try:
log.msg(
"Consuming message from topic {topic!r} (id {msgid})".format(
topic=message.topic, msgid=properties.message_id
),
system=self.name,
logLevel=logging.DEBUG,
_log.info(
"Consuming message from topic {topic!r} (id {msgid})",
topic=message.topic,
msgid=properties.message_id,
)
yield defer.maybeDeferred(consumer["callback"], message)
except Nack:
log.msg(
"Returning message id {msgid} to the queue".format(
msgid=properties.message_id
),
system=self.name,
logLevel=logging.WARNING,
_log.warn(
"Returning message id {msgid} to the queue", msgid=properties.message_id
)
yield self._channel.basic_nack(
delivery_tag=delivery_frame.delivery_tag, requeue=True
)
except Drop:
log.msg(
"Dropping message id {msgid}".format(msgid=properties.message_id),
system=self.name,
logLevel=logging.WARNING,
_log.warn(
"Consumer requested message id {msgid} be dropped",
msgid=properties.message_id,
)
yield self._channel.basic_nack(
delivery_tag=delivery_frame.delivery_tag, requeue=False
)
except HaltConsumer:
log.msg(
"Consumer indicated it wishes consumption to halt, " "shutting down",
system=self.name,
logLevel=logging.WARNING,
)
_log.info("Consumer indicated it wishes consumption to halt, shutting down")
yield self._channel.basic_ack(delivery_tag=delivery_frame.delivery_tag)
yield self.stopProducing()
except Exception:
log.err(
"Received unexpected exception from consumer callback", system=self.name
)
log.err(system=self.name)
_log.failure("Received unexpected exception from consumer {c}", c=consumer)
yield self._channel.basic_nack(delivery_tag=0, multiple=True, requeue=True)
yield self.stopProducing()
else:
Expand Down Expand Up @@ -372,8 +359,10 @@ def resumeProducing(self):
queue=consumer["queue"], consumer_tag=consumer["tag"]
)
deferred = self._read(queue_object, consumer)
deferred.addErrback(log.err, system=self.name)
log.msg("AMQP consumer is ready", system=self.name, logLevel=logging.DEBUG)
deferred.addErrback(
lambda f: _log.failure, "_read failed on consumer {c}", c=consumer
)
_log.info("AMQP connection successfully established")

@defer.inlineCallbacks
def pauseProducing(self):
Expand All @@ -393,11 +382,7 @@ def pauseProducing(self):
self._running = False
for consumer_tag in self._channel.consumer_tags:
yield self._channel.basic_cancel(consumer_tag=consumer_tag)
log.msg(
"Paused retrieval of messages for the server queue",
system=self.name,
logLevel=logging.DEBUG,
)
_log.info("Paused retrieval of messages for the server queue")

@defer.inlineCallbacks
def stopProducing(self):
Expand All @@ -412,10 +397,6 @@ def stopProducing(self):
if self._running:
yield self.pauseProducing()
if not self.is_closed:
log.msg(
"Disconnecting from the Fedora Messaging broker",
system=self.name,
logLevel=logging.DEBUG,
)
_log.info("Disconnecting from the AMQP broker")
yield self.close()
self._channel = None

0 comments on commit 2781105

Please sign in to comment.