Skip to content

Commit

Permalink
cli: Add a record sub-command
Browse files Browse the repository at this point in the history
Add a new CLI command to record messages from a message broker and write
them to a file (or stdout). This was initially intended to be used to
collect test fixtures, but can also be chained together with other
commands using pipes. Messages are written as they arrive, one message
per line, so it's easy to, say, run a command when a message arrives:

$ fedora-messaging record - | while read -r line || -n $line;
    do echo "message received"; done

or create a quick (and completely unreliable) bridge between brokers:

$ fedora-messaging --conf=broker1.toml record - | \
     fedora-messaging --conf=broker2.toml publish

Signed-off-by: Sebastian Wojciechowski <s.wojciechowski89@gmail.com>
[Rebase and refactor]
Signed-off-by: Jeremy Cline <jcline@redhat.com>
  • Loading branch information
sebwoj authored and jeremycline committed Jun 26, 2019
1 parent 6bd1e1d commit a311b4e
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 13 deletions.
58 changes: 54 additions & 4 deletions docs/fedora-messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ Options
Path to a valid configuration file to use in place of the configuration in
``/etc/fedora-messaging/config.toml``.


Commands
========

There are two sub-commands, ``consume`` and ``publish``, described in detail in
their own sections below.
There are three sub-commands, ``consume``, ``publish`` and ``record``, described
in detail in their own sections below.

``fedora-messaging consume [OPTIONS]``

Expand All @@ -46,6 +47,11 @@ their own sections below.
Loads serialized messages from a file and publishes them to the specified
exchange.

``fedora-messaging record [OPTIONS] FILE``

Records messages arrived from AMQP queue and saves them to file with specified
name.


consume
-------
Expand Down Expand Up @@ -96,8 +102,7 @@ configuration file and no options on the command line.
``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon. If one is not specified, the
default is the ``amq.topic`` exchange.
digits, hyphen, underscore, period, or colon.

Setting this option is equivalent to setting the ``exchange`` setting
in *all* ``bindings`` entries in the configuration file.
Expand All @@ -112,6 +117,51 @@ publish
digits, hyphen, underscore, period, or colon.


record
------

``--limit``

The maximum number of messages to record.

``--app-name``

The name of the application, used by the AMQP client to identify itself to
the broker. This is purely for administrator convenience to determine what
applications are connected and own particular resources. If not specified,
the default is ``recorder``.

This option is equivalent to the ``app`` setting in the ``client_properties``
section of the configuration file.

``--routing-key``

The AMQP routing key to use with the queue. This controls what messages are
delivered to the consumer. Can be specified multiple times; any message
that matches at least one will be placed in the message queue.

Setting this option is equivalent to setting the ``routing_keys`` setting
in *all* ``bindings`` entries in the configuration file.

``--queue-name``

The name of the message queue in AMQP. Can contain ASCII letters, digits,
hyphen, underscore, period, or colon. If one is not specified, a unique
name will be created for you.

Setting this option is equivalent to setting the ``queue`` setting in *all*
``bindings`` entries and creating a ``queue.<queue-name>`` section in the
configuration file.

``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon.

Setting this option is equivalent to setting the ``exchange`` setting
in *all* ``bindings`` entries in the configuration file.


Exit codes
==========

Expand Down
91 changes: 83 additions & 8 deletions fedora_messaging/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import pkg_resources

from . import config, api, exceptions
from .message import loads
from .message import dumps, loads

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,13 +72,13 @@
)
_exchange_help = (
"The name of the exchange to bind the queue to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon. If one is not specified, the "
"default is the ``amq.topic`` exchange."
"digits, hyphen, underscore, period, or colon."
)
_publish_exchange_help = (
"The name of the exchange to publish to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon."
)
_limit_help = "The maximum number of messages to record."


# Global variable used to set the exit code in error handlers, then let
Expand Down Expand Up @@ -113,7 +113,26 @@ def consume(exchange, queue_name, routing_key, callback, callback_file, app_name
"""Consume messages from an AMQP queue using a Python callback."""
# The configuration validates these are not null and contain all required keys
# when it is loaded.
if callback_file:
callback = _callback_from_filesystem(callback_file)
else:
callback = _callback_from_python_path(callback)
_consume(exchange, queue_name, routing_key, callback, app_name)


def _consume(exchange, queue_name, routing_key, callback, app_name):
"""
The actual consume code, which expects an actual callable object.
This lets various consume-based commands share the setup code. Anything
that accepts None loads the defaults from the configuration.
Args:
exchange (str): The AMQP message exchange to bind to, or None.
queue_name (str): The queue name to use, or None.
routing_key (str): The routing key to use, or None.
callback (callable): A callable object to use for the callback.
app_name (str): The application name to use, or None.
"""
bindings = config.conf["bindings"]
queues = config.conf["queues"]

Expand All @@ -137,11 +156,6 @@ def consume(exchange, queue_name, routing_key, callback, callback_file, app_name
for binding in bindings:
binding["routing_keys"] = routing_key

if callback_file:
callback = _callback_from_filesystem(callback_file)
else:
callback = _callback_from_python_path(callback)

if app_name:
config.conf["client_properties"]["app"] = app_name

Expand Down Expand Up @@ -378,3 +392,64 @@ def publish(exchange, file):
except exceptions.PublishException as e:
click.echo("A general publish exception occurred: {}".format(str(e)))
sys.exit(1)


class Recorder:
"""
A simple callback class that records messages.
Attributes:
counter (int): The number of messages this callback has recorded.
messages (list): The list of messages received.
Args:
limit (int): The maximum number of messages to record.
file (file): The file object with write rights.
"""

def __init__(self, limit, file):
self.counter = 0
self._limit = limit
self._file = file
if limit:
self._bar = click.progressbar(length=limit)

def collect_message(self, message):
"""
Collect received messages.
Args:
message (message.Message): The received message.
Raises:
fedora_messaging.exceptions.HaltConsumer: Raised if the number of received
messages reach the limit or if collected messages serialization is impossible.
"""
try:
json_str = dumps(message)
except exceptions.ValidationError as e:
click.echo("Unable to save messages to file: {}".format(str(e)))
raise exceptions.HaltConsumer(exit_code=1, requeue=False)
else:
self._file.write(json_str)

self.counter += 1
if self._limit:
self._bar.update(1)
if self._limit <= self.counter:
raise exceptions.HaltConsumer(exit_code=0, requeue=False)


@cli.command()
@click.argument("file", type=click.File("w"))
@click.option("--limit", help=_limit_help, type=click.IntRange(1))
@click.option("--app-name", help=_app_name_help, default="recorder")
@click.option("--routing-key", help=_routing_key_help, multiple=True)
@click.option("--queue-name", help=_queue_name_help)
@click.option("--exchange", help=_exchange_help)
def record(exchange, queue_name, routing_key, app_name, limit, file):
"""Record messages from an AMQP queue to provided file."""
messages_recorder = Recorder(limit, file)
_consume(
exchange, queue_name, routing_key, messages_recorder.collect_message, app_name
)
98 changes: 97 additions & 1 deletion fedora_messaging/tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_no_cli_bindings(self, mock_consume):

@mock.patch("fedora_messaging.cli.api.twisted_consume")
def test_queue_and_routing_key(self, mock_consume):
"""Assert providing improper bindings is reported."""
"""Asser providing improper bindings is reported."""
config.conf["callback"] = "fedora_messaging.tests.unit.test_cli:echo"

result = self.runner.invoke(
Expand Down Expand Up @@ -521,6 +521,11 @@ class PublishCliTests(unittest.TestCase):
def setUp(self):
self.runner = CliRunner()

def tearDown(self):
"""Make sure each test has a fresh default configuration."""
config.conf = config.LazyConfig()
config.conf.load_config()

def test_correct_msg_in_file(self):
"""Assert providing path to file with correct message via the CLI works."""
cli_options = {"file": GOOD_MSG_DUMP, "exchange": "test_pe"}
Expand Down Expand Up @@ -652,3 +657,94 @@ def test_publish_general_publish_error(self, mock_publish):
self.assertIn("A general publish exception occurred: eh", result.output)
mock_publish.assert_called_once()
self.assertEqual(1, result.exit_code)


class RecordCliTests(unittest.TestCase):
"""Unit tests for the 'record' command of the CLI."""

def setUp(self):
self.runner = CliRunner()

def tearDown(self):
"""Make sure each test has a fresh default configuration."""
config.conf = config.LazyConfig()
config.conf.load_config(config_path="")

@mock.patch("fedora_messaging.cli._consume")
def test_good_cli_bindings(self, mock_consume):
"""Assert arguments are forwarded to the _consume function."""
cli_options = {
"file": "test_file.txt",
"exchange": "e",
"queue-name": "qn",
"routing-keys": ("rk1", "rk2"),
}
self.runner.invoke(
cli.cli,
[
"record",
cli_options["file"],
"--exchange=" + cli_options["exchange"],
"--queue-name=" + cli_options["queue-name"],
"--routing-key=" + cli_options["routing-keys"][0],
"--routing-key=" + cli_options["routing-keys"][1],
],
)
mock_consume.assert_called_once_with(
"e", "qn", ("rk1", "rk2"), mock.ANY, "recorder"
)


class RecorderClassTests(unittest.TestCase):
"""Unit tests for the 'Recorder' class."""

def test_save_recorded_messages_when_limit_is_reached(self):
"""Assert that collected messages are saved to file when limit is reached."""
msg1 = message.Message(
body={"test_key1": "test_value1"},
topic="test_topic1",
severity=message.INFO,
)
msg1._properties.headers["sent-at"] = "2018-11-18T10:11:41+00:00"
msg1.id = "273ed91d-b8b5-487a-9576-95b9fbdf3eec"

msg2 = message.Message(
body={"test_key2": "test_value2"},
topic="test_topic2",
severity=message.INFO,
)
msg2._properties.headers["sent-at"] = "2018-11-18T10:11:41+00:00"
msg2.id = "273ed91d-b8b5-487a-9576-95b9fbdf3eec"

mock_file = mock.MagicMock()
test_recorder = cli.Recorder(2, mock_file)
test_recorder.collect_message(msg1)
mock_file.write.assert_called_with(
'{"body": {"test_key1": "test_value1"}, "headers"'
': {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, '
'"sent-at": "2018-11-18T10:11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec"'
', "queue": null, "topic": "test_topic1"}\n'
)

with self.assertRaises(exceptions.HaltConsumer) as cm:
test_recorder.collect_message(msg2)
the_exception = cm.exception
self.assertEqual(the_exception.exit_code, 0)
self.assertEqual(test_recorder.counter, 2)
mock_file.write.assert_called_with(
'{"body": {"test_key2": "test_value2"}, "headers": '
'{"fedora_messaging_schema": "base.message", "fedora_messaging_severity": '
'20, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": null, "topic": "test_topic2"}\n'
)

def test_recorded_messages_dumps_failed(self):
"""Assert that attempt to save improper recorded message is reported."""
mock_file = mock.MagicMock()
test_recorder = cli.Recorder(1, mock_file)
with self.assertRaises(exceptions.HaltConsumer) as cm:
test_recorder.collect_message("msg1")
the_exception = cm.exception
self.assertEqual(the_exception.exit_code, 1)
self.assertEqual(test_recorder.counter, 0)
mock_file.write.assert_not_called()
1 change: 1 addition & 0 deletions news/PR43.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The ``fedora-messaging`` cli now has 2 new sub-commands: ``publish`` and ``record``.

0 comments on commit a311b4e

Please sign in to comment.