Skip to content

Commit

Permalink
Support for message priorities
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <aurelien@bompard.org>
  • Loading branch information
abompard committed Oct 14, 2022
1 parent 9a94dcf commit 1d65a1c
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 9 deletions.
12 changes: 12 additions & 0 deletions fedora_messaging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@
This is useful to migrate from fedmsg, but should not be used otherwise.
The default is an empty string.
.. _conf-publish-priority:
publish_priority
----------------
A number that will be set as the priority for the messages. The range of
possible priorities depends on the ``x-max-priority`` argument of the
destination queue, as described in `RabbitMQ's priority documentation`_.
The default is ``None``, which RabbitMQ will interpret as zero.
.. _RabbitMQ's priority documentation: https://www.rabbitmq.com/priority.html
.. _sub-config:
Consumer Options
Expand Down Expand Up @@ -316,6 +327,7 @@ def callback(message):
},
publish_exchange="amq.topic",
topic_prefix="",
publish_priority=None,
passive_declares=False,
exchanges={
"amq.topic": {
Expand Down
26 changes: 26 additions & 0 deletions fedora_messaging/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ class attribute, although this is a convenient approach. Users are
recent version. Emits a warning when a message of this class is received,
to let consumers know that they should plan to upgrade. Defaults to
``False``.
priority (int): The priority for the message, if the destination queue
supports it. Defaults to zero (lowest priority).
This value is taken into account in queues that have the ``x-max-priority``
argument set. Most queues in Fedora don't support priorities, in which case
the value will be ignored.
Larger numbers indicate higher priority, you can read more about it in
`RabbitMQ's documentation on priority`_.
.. _RabbitMQ's documentation on priority: https://www.rabbitmq.com/priority.html
"""

severity = INFO
Expand Down Expand Up @@ -348,6 +359,7 @@ def _build_properties(self, headers):
delivery_mode=2,
headers=headers,
message_id=message_id,
priority=config.conf["publish_priority"],
)

def _filter_headers(self):
Expand Down Expand Up @@ -394,6 +406,14 @@ def id(self):
def id(self, value):
self._properties.message_id = value

@property
def priority(self):
return self._properties.priority or 0

@priority.setter
def priority(self, value):
self._properties.priority = value

@property
def _encoded_routing_key(self):
"""The encoded routing key used to publish the message on the broker."""
Expand Down Expand Up @@ -673,6 +693,10 @@ def flatpaks(self):
"type": ["string", "null"],
"description": "The queue the message arrived on, if any.",
},
"priority": {
"type": ["integer", "null"],
"description": "The priority that the message has been sent with.",
},
},
"required": ["topic", "headers", "id", "body"],
}
Expand Down Expand Up @@ -710,6 +734,7 @@ def dumps(messages):
"id": message.id,
"body": message.body,
"queue": message.queue,
"priority": message.priority,
}
serialized_messages.append(json.dumps(m, ensure_ascii=False, sort_keys=True))

Expand Down Expand Up @@ -751,6 +776,7 @@ def loads(serialized_messages):
)
message.queue = message_dict["queue"] if "queue" in message_dict else None
message.id = message_dict["id"]
message.priority = message_dict.get("priority")
messages.append(message)

return messages
5 changes: 3 additions & 2 deletions fedora_messaging/tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def test_save_recorded_messages_when_limit_is_reached(self):
'{"body": {"test_key1": "test_value1"}, "headers"'
': {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, '
'"sent-at": "2018-11-18T10:11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec"'
', "queue": null, "topic": "test_topic1"}\n'
', "priority": 0, "queue": null, "topic": "test_topic1"}\n'
)

with self.assertRaises(exceptions.HaltConsumer) as cm:
Expand All @@ -748,7 +748,8 @@ def test_save_recorded_messages_when_limit_is_reached(self):
'{"body": {"test_key2": "test_value2"}, "headers": '
'{"fedora_messaging_schema": "base.message", "fedora_messaging_severity": '
'20, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": null, "topic": "test_topic2"}\n'
'"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "priority": 0, "queue": null, '
'"topic": "test_topic2"}\n'
)

def test_recorded_messages_dumps_failed(self):
Expand Down
3 changes: 3 additions & 0 deletions fedora_messaging/tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
topic_prefix = ""
publish_priority = 42
callback = "fedora_messaging.examples:print_msg"
bindings = [
Expand Down Expand Up @@ -275,6 +277,7 @@ def test_full_config_file(self, mock_exists, mock_log):
},
topic_prefix="",
publish_exchange="special_exchange",
publish_priority=42,
passive_declares=False,
exchanges={
"custom_exchange": {
Expand Down
36 changes: 29 additions & 7 deletions fedora_messaging/tests/unit/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_proper_message(self):
delivery_mode=2,
headers=test_headers,
message_id=test_id,
priority=2,
)
test_msg = message.Message(
body=test_body, topic=test_topic, properties=test_properties
Expand All @@ -107,8 +108,8 @@ def test_proper_message(self):
test_msg.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", "queue": '
'"test queue", "topic": "test topic"}\n'
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"priority": 2, "queue": "test queue", "topic": "test topic"}\n'
)
self.assertEqual(expected_json, message.dumps(test_msg))

Expand Down Expand Up @@ -139,11 +140,11 @@ def test_proper_message_multiple(self):
test_msg2.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", "queue": '
'"test queue", "topic": "test topic"}\n'
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", "queue": '
'"test queue", "topic": "test topic"}\n'
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
)

self.assertEqual(expected_json, message.dumps([test_msg, test_msg2]))
Expand All @@ -162,7 +163,7 @@ def test_proper_json(self):
message_json = (
'{"topic": "test topic", "headers": {"fedora_messaging_schema": "base.message", '
'"fedora_messaging_severity": 30}, "id": "test id", "body": '
'{"test_key": "test_value"}, "queue": "test queue"}\n'
'{"test_key": "test_value"}, "priority": 2, "queue": "test queue"}\n'
)
messages = message.loads(message_json)
test_message = messages[0]
Expand All @@ -172,6 +173,7 @@ def test_proper_json(self):
self.assertEqual("test id", test_message.id)
self.assertEqual({"test_key": "test_value"}, test_message.body)
self.assertEqual("test queue", test_message.queue)
self.assertEqual(2, test_message.priority)
self.assertEqual(
message.WARNING, test_message._headers["fedora_messaging_severity"]
)
Expand Down Expand Up @@ -247,6 +249,17 @@ def test_missing_severity(self):
)
self.assertRaises(exceptions.ValidationError, message.loads, message_json)

def test_missing_priority(self):
"""Assert message without priority is accepted and the priority is set to zero."""
message_json = (
'{"topic": "test topic", "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"body": {"test_key": "test_value"}, "queue": "test queue"}'
)
messages = message.loads(message_json)
test_message = messages[0]
self.assertEqual(test_message.priority, 0)

def test_validation_failure(self):
"""Assert proper exception is raised when message validation failed."""
message_json = (
Expand Down Expand Up @@ -359,6 +372,15 @@ def test_sent_at(self):

self.assertEqual("1970-01-01T00:00:00+00:00", msg._headers["sent-at"])

def test_priority(self):
"""Assert is set correctly."""
msg = message.Message()
self.assertEqual(0, msg.priority)
msg.priority = 42
self.assertEqual(42, msg.priority)
msg.priority = None
self.assertEqual(0, msg.priority)

def test_properties(self):
properties = object()
msg = message.Message(properties=properties)
Expand Down
1 change: 1 addition & 0 deletions news/PR275.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a priority property to messages, and a default priority in the configuration
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
envlist = checks,licenses,py{36,37,38,39,310}-{unittest,integration}

[testenv]
passenv = HOME
deps =
-rdev-requirements.txt
sitepackages = False
Expand Down

0 comments on commit 1d65a1c

Please sign in to comment.