Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include the queue name messages came from #65

Merged
merged 3 commits into from
Sep 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fedora_messaging/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def __init__(self):
self._channel = None
self._bindings = []
self._running = False
self._consumers = {}

def _signal_handler(signum, frame):
"""
Expand Down Expand Up @@ -408,7 +409,8 @@ def _on_queue_declareok(self, frame):
bc_args["consumer_callback"] = self._on_message
else:
bc_args["on_message_callback"] = self._on_message
self._channel.basic_consume(**bc_args)
tag = self._channel.basic_consume(**bc_args)
self._consumers[tag] = binding["queue"]

def _on_cancel(self, cancel_frame):
"""
Expand Down Expand Up @@ -530,6 +532,7 @@ def _on_message(self, channel, delivery_frame, properties, body):

try:
message = get_message(delivery_frame.routing_key, properties, body)
message.queue = self._consumers[delivery_frame.consumer_tag]
except ValidationError:
channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False)
return
Expand Down
2 changes: 2 additions & 0 deletions fedora_messaging/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class Message(object):
severity (int): An integer that indicates the severity of the message. This is
used to determine what messages to notify end users about and should be
:data:`DEBUG`, :data:`INFO`, :data:`WARNING`, or :data:`ERROR`.
queue (str): The name of the queue this message arrived through.

Args:
headers (dict): A set of message headers. Consult the headers schema for
Expand Down Expand Up @@ -253,6 +254,7 @@ def __init__(
if severity:
self.severity = severity
self._properties = properties or self._build_properties(headers)
self.queue = None

def _build_properties(self, headers):
# Consumers use this to determine what schema to use and if they're out
Expand Down
8 changes: 8 additions & 0 deletions fedora_messaging/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ def setUp(self):
self.channel = mock.Mock()
self.consumer._connection = mock.Mock()
self.consumer._running = True
self.consumer._consumers["consumer1"] = "my_queue"
self.frame = mock.Mock()
self.frame.consumer_tag = "consumer1"
self.frame.delivery_tag = "testtag"
self.frame.routing_key = "test.topic"
self.properties = mock.Mock()
Expand All @@ -388,6 +390,12 @@ def test_message(self):
self.channel.basic_ack.assert_called_with(delivery_tag="testtag")
self.assertEqual(msg._body, "test body")

def test_message_queue_set(self):
"""Assert the queue attribute is set on messages."""
self.consumer._on_message(self.channel, self.frame, self.properties, b"{}")
msg = self.consumer._consumer_callback.call_args_list[0][0][0]
self.assertEqual(msg.queue, "my_queue")

def test_message_encoding(self):
body = '"test body unicode é à ç"'.encode("utf-8")
self.properties.content_encoding = None
Expand Down
5 changes: 4 additions & 1 deletion fedora_messaging/tests/unit/twisted/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,16 @@ def setUp(self):
self.protocol = MockProtocol(None)
self.protocol._message_callback = mock.Mock()
self.protocol._impl.is_closed = False
self.protocol._consumers["consumer1"] = "my_queue_name"

def _call_on_message(self, topic, headers, body):
"""Prepare arguments for the _on_message() method and call it."""
full_headers = {"fedora_messaging_schema": "fedora_messaging.message:Message"}
full_headers.update(headers)
return self.protocol._on_message(
pika.spec.Basic.Deliver(routing_key=topic, delivery_tag="delivery_tag"),
pika.spec.Basic.Deliver(
routing_key=topic, delivery_tag="delivery_tag", consumer_tag="consumer1"
),
pika.spec.BasicProperties(headers=full_headers),
json.dumps(body).encode("utf-8"),
)
Expand Down
5 changes: 5 additions & 0 deletions fedora_messaging/twisted/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def __init__(self, parameters, confirms=True):
self._running = False
self._queues = set()
self._message_callback = None
# Map consumer tags to queue names
self._consumers = {}
self.factory = None

@defer.inlineCallbacks
Expand Down Expand Up @@ -192,6 +194,7 @@ def _on_message(self, delivery_frame, properties, body):
)
try:
message = get_message(delivery_frame.routing_key, properties, body)
message.queue = self._consumers[delivery_frame.consumer_tag]
except ValidationError:
log.msg(
"Message id {msgid} did not pass validation.".format(
Expand Down Expand Up @@ -289,6 +292,7 @@ def resumeProducing(self):
queue_object, _consumer_tag = yield self._channel.basic_consume(
queue=queue_name
)
self._consumers[_consumer_tag] = queue_name
self._read(queue_object).addErrback(log.err, system=self.name)
log.msg("AMQP consumer is ready", system=self.name, logLevel=logging.DEBUG)

Expand All @@ -310,6 +314,7 @@ def pauseProducing(self):
self._running = False
for consumer_tag in self._channel.consumer_tags:
yield self._channel.basic_cancel(consumer_tag=consumer_tag)
self._consumers = {}
log.msg(
"Paused retrieval of messages for the server queue",
system=self.name,
Expand Down