Skip to content

Commit

Permalink
New cli record command
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Wojciechowski <s.wojciechowski89@gmail.com>
  • Loading branch information
sebwoj committed Nov 29, 2018
1 parent 92e7f1a commit 892ef22
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 7 deletions.
45 changes: 41 additions & 4 deletions docs/fedora-messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ Options
Path to a valid configuration file to use in place of the configuration in
``/etc/fedora-messaging/config.toml``.


Commands
========

There are two sub-commands, ``consume`` and ``publish``, described in detail in
their own sections below.
There are three sub-commands, ``consume``, ``publish`` and ``record``, described
in detail in their own sections below.

``fedora-messaging consume [OPTIONS]``

Expand All @@ -43,6 +44,11 @@ their own sections below.
Loads serialized messages from a file and publishes them to the specified
exchange.

``fedora-messaging record [OPTIONS] FILE``

Records messages arrived from AMQP queue and saves them to file with specified
name.


consume
-------
Expand Down Expand Up @@ -75,8 +81,7 @@ consume
``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon. If one is not specified, the
default is the ``amq.topic`` exchange.
digits, hyphen, underscore, period, or colon.


publish
Expand All @@ -88,6 +93,38 @@ publish
digits, hyphen, underscore, period, or colon.


record
------

``--limit``

The maximum number of messages to record.

``--app-name``

The name of the application, used by the AMQP client to identify itself to
the broker. This is purely for administrator convenience to determine what
applications are connected and own particular resources. If not specified,
the default is ``recorder``.

``--routing-key``

The AMQP routing key to use with the queue. This controls what messages are
delivered to the consumer. Can be specified multiple times; any message
that matches at least one will be placed in the message queue.

``--queue-name``

The name of the message queue in AMQP. Can contain ASCII letters, digits,
hyphen, underscore, period, or colon. If one is not specified, a unique
name will be created for you.

``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon.


Help
====

Expand Down
103 changes: 100 additions & 3 deletions fedora_messaging/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import os
import sys
import errno
import signal

import click
import pkg_resources

from . import config, api, exceptions
from .message import loads
from .message import dumps, loads

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,13 +63,13 @@
)
_exchange_help = (
"The name of the exchange to bind the queue to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon. If one is not specified, the "
"default is the ``amq.topic`` exchange."
"digits, hyphen, underscore, period, or colon."
)
_publish_exchange_help = (
"The name of the exchange to publish to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon."
)
_limit_help = "The maximum number of messages to record."


@click.group()
Expand Down Expand Up @@ -195,3 +196,99 @@ def publish(exchange, file):
except exceptions.ConnectionException as e:
click.echo("Unable to connect to the queue: {}".format(str(e.reason)))
sys.exit(errno.ECONNREFUSED)


@cli.command()
@click.argument("file", type=click.File("w"))
@click.option("--limit", help=_limit_help, type=click.IntRange(1))
@click.option("--app-name", help=_app_name_help, default="recorder")
@click.option("--routing-key", help=_routing_key_help, multiple=True)
@click.option("--queue-name", help=_queue_name_help)
@click.option("--exchange", help=_exchange_help)
def record(exchange, queue_name, routing_key, app_name, limit, file):
"""Record messages from an AMQP queue to provided file."""

class Recorder:
"""
A simple callback that records messages.
Attributes:
counter (int): The number of messages this callback has recorded.
messages (list): The list of messages received.
"""

def __init__(self):
self.counter = 0
self.messages = []
if limit:
self.bar = click.progressbar(length=limit)

def _signal_handler(signum, frame):
"""
Signal handler that gracefully terminates the recorder
Args:
signum (int): The signal this process received.
frame (frame): The current stack frame (unused).
"""
if signum in (signal.SIGTERM, signal.SIGINT):
self.save_messages_to_file()
raise exceptions.HaltConsumer(exit_code=0, requeue=False)

signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)

def __call__(self, message):
self.messages.append(message)
self.counter += 1
if limit:
self.bar.update(1)
if limit == self.counter:
self.save_messages_to_file()
raise exceptions.HaltConsumer(exit_code=0, requeue=False)

def save_messages_to_file(self):
try:
json_str = dumps(self.messages)
except TypeError as e:
click.echo("Unable to save messages to file: {}".format(str(e)))
raise exceptions.HaltConsumer(exit_code=1, requeue=False)
else:
file.write(json_str)

if exchange and queue_name and routing_key:
bindings = [
{"exchange": exchange, "queue": queue_name, "routing_keys": routing_key}
]
elif not exchange and not queue_name and not routing_key:
bindings = config.conf["bindings"]
else:
raise click.ClickException(
"You must define all three of exchange, queue_name and"
" routing_key, or none of them to use the configuration"
)
if not bindings:
raise click.ClickException(
"No bindings are defined in the configuration file"
" and none were provided as arguments!"
)

config.conf["client_properties"]["app"] = app_name

click.echo("Recording to file {} started.".format(file.name))
try:
return api.consume(Recorder, bindings)
except ValueError as e:
click_version = pkg_resources.get_distribution("click").parsed_version
if click_version < pkg_resources.parse_version("7.0"):
raise click.exceptions.BadOptionUsage(str(e))
else:
raise click.exceptions.BadOptionUsage("callback", str(e))
except exceptions.HaltConsumer as e:
if e.exit_code:
click.echo(
"Consumer halted with non-zero exit code ({}): {}".format(
e.exit_code, str(e.reason)
)
)
sys.exit(e.exit_code)
147 changes: 147 additions & 0 deletions fedora_messaging/tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,150 @@ def test_publish_connection_failed(self, mock_publish):
self.assertIn("Unable to connect to the queue: " + error_message, result.output)
mock_publish.assert_called_once()
self.assertEqual(errno.ECONNREFUSED, result.exit_code)


class RecordCliTests(unittest.TestCase):
"""Unit tests for the 'record' command of the CLI."""

def setUp(self):
self.runner = CliRunner()

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": None})
@mock.patch("fedora_messaging.cli.api.consume")
def test_good_cli_bindings(self, mock_consume):
"""Assert providing a bindings via the CLI works."""
cli_options = {
"file": "test_file.txt",
"exchange": "e",
"queue-name": "qn",
"routing-keys": ("rk1", "rk2"),
}
result = self.runner.invoke(
cli.cli,
[
"record",
cli_options["file"],
"--exchange=" + cli_options["exchange"],
"--queue-name=" + cli_options["queue-name"],
"--routing-key=" + cli_options["routing-keys"][0],
"--routing-key=" + cli_options["routing-keys"][1],
],
)
mock_consume.assert_called_once()
self.assertEqual(
mock_consume.call_args_list[0][0][1],
[
{
"exchange": cli_options["exchange"],
"queue": cli_options["queue-name"],
"routing_keys": cli_options["routing-keys"],
}
],
)
self.assertEqual(
"Recording to file " + cli_options["file"] + " started.\n", result.output
)
self.assertEqual(0, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": "test_bindings"})
@mock.patch("fedora_messaging.cli.api.consume")
def test_no_cli_bindings(self, mock_consume):
"""Assert providing a bindings via configuration works."""
cli_options = {"file": "test_file.txt"}
result = self.runner.invoke(cli.cli, ["record", cli_options["file"]])
mock_consume.assert_called_once()
self.assertEqual(mock_consume.call_args_list[0][0][1], "test_bindings")
self.assertEqual(
"Recording to file " + cli_options["file"] + " started.\n", result.output
)
self.assertEqual(0, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": None})
@mock.patch("fedora_messaging.cli.api.consume")
def test_wrong_cli_bindings(self, mock_consume):
"""Assert providing improper bindings is reported."""
cli_options = {"file": "test_file.txt", "queue-name": "qn", "routing-key": "rk"}
result = self.runner.invoke(
cli.cli,
[
"record",
cli_options["file"],
"--queue-name=" + cli_options["queue-name"],
"--routing-key=" + cli_options["routing-key"],
],
)
mock_consume.assert_not_called()
self.assertIn(
"You must define all three of exchange, queue_name and"
" routing_key, or none of them to use the configuration",
result.output,
)
self.assertEqual(1, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": None})
@mock.patch("fedora_messaging.cli.api.consume")
def test_missing_cli_and_conf_bindings(self, mock_consume):
"""Assert missing bindings via cli and in conf is reported."""
cli_options = {"file": "test_file.txt"}
result = self.runner.invoke(cli.cli, ["record", cli_options["file"]])
mock_consume.assert_not_called()
self.assertIn(
"No bindings are defined in the configuration file"
" and none were provided as arguments!",
result.output,
)
self.assertEqual(1, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": "test_bindings"})
@mock.patch("fedora_messaging.cli.api.consume")
def test_consume_improper_callback_object(self, mock_consume):
"""Assert improper callback object type failure is reported."""
cli_options = {"file": "test_file.txt"}
error_message = (
"Callback must be a class that implements __call__ or a function."
)
mock_consume.side_effect = ValueError(error_message)
result = self.runner.invoke(cli.cli, ["record", cli_options["file"]])
mock_consume.assert_called_once()
self.assertEqual(mock_consume.call_args_list[0][0][1], "test_bindings")
self.assertIn(error_message, result.output)
self.assertIn(
"Recording to file " + cli_options["file"] + " started.\n", result.output
)
self.assertEqual(2, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": "test_bindings"})
@mock.patch("fedora_messaging.cli.api.consume")
def test_consume_halt_with_exitcode(self, mock_consume):
"""Assert user execution halt with reason and exit_code is reported."""
cli_options = {"file": "test_file.txt"}
halt_message = "User halted execution"
halt_exit_code = 5
mock_consume.side_effect = exceptions.HaltConsumer(
exit_code=halt_exit_code, reason=halt_message
)
result = self.runner.invoke(cli.cli, ["record", cli_options["file"]])
mock_consume.assert_called_once()
self.assertEqual(mock_consume.call_args_list[0][0][1], "test_bindings")
self.assertIn(
"Consumer halted with non-zero exit code (5): User halted execution",
result.output,
)
self.assertIn(
"Recording to file " + cli_options["file"] + " started.\n", result.output
)
self.assertEqual(halt_exit_code, result.exit_code)

@mock.patch.dict("fedora_messaging.config.conf", {"bindings": "test_bindings"})
@mock.patch("fedora_messaging.cli.api.consume")
def test_consume_halt_without_exitcode(self, mock_consume):
"""Assert user execution halt is reported."""
cli_options = {"file": "test_file.txt"}
mock_consume.side_effect = exceptions.HaltConsumer(exit_code=0)
result = self.runner.invoke(cli.cli, ["record", cli_options["file"]])
mock_consume.assert_called_once()
self.assertEqual(mock_consume.call_args_list[0][0][1], "test_bindings")
self.assertEqual(
"Recording to file " + cli_options["file"] + " started.\n", result.output
)
self.assertEqual(0, result.exit_code)

0 comments on commit 892ef22

Please sign in to comment.