From 7fe5d1bfc1aa87caf1484e83459e2bf4d458d8a8 Mon Sep 17 00:00:00 2001 From: Sebastian Wojciechowski Date: Thu, 23 Aug 2018 09:49:09 +0000 Subject: [PATCH] New cli publish command Signed-off-by: Sebastian Wojciechowski --- docs/fedora-messaging.rst | 18 ++- fedora_messaging/cli.py | 39 +++++++ fedora_messaging/tests/unit/test_cli.py | 139 +++++++++++++++++++++++- 3 files changed, 193 insertions(+), 3 deletions(-) diff --git a/docs/fedora-messaging.rst b/docs/fedora-messaging.rst index 99222229..0dcadd7a 100644 --- a/docs/fedora-messaging.rst +++ b/docs/fedora-messaging.rst @@ -33,14 +33,19 @@ Options Commands ======== -There is a single sub-command, ``consume``, described in detail in its ow -section below. +There are two sub-commands, ``consume`` and ``publish``, described in detail in +their own sections below. ``fedora-messaging consume [OPTIONS]`` Starts a consumer process with a user-provided callback function to execute when a message arrives. +``fedora-messaging publish [OPTIONS]`` + + Loads serialized messages from stdin and publishes them to the specified + exchange. + consume ------- @@ -98,6 +103,15 @@ configuration file and no options on the command line. in *all* ``bindings`` entries in the configuration file. +publish +------- + +``--exchange`` + + The name of the exchange to publish to. Can contain ASCII letters, + digits, hyphen, underscore, period, or colon. + + Exit codes ========== diff --git a/fedora_messaging/cli.py b/fedora_messaging/cli.py index 51f5fbbf..b1bf69ed 100644 --- a/fedora_messaging/cli.py +++ b/fedora_messaging/cli.py @@ -26,6 +26,7 @@ import logging.config import os import sys +import errno from twisted.python import log as legacy_twisted_log from twisted.internet import reactor, error @@ -33,6 +34,7 @@ import pkg_resources from . import config, api, exceptions +from .message import loads _log = logging.getLogger(__name__) @@ -66,6 +68,10 @@ "digits, hyphen, underscore, period, or colon. If one is not specified, the " "default is the ``amq.topic`` exchange." ) +_publish_exchange_help = ( + "The name of the exchange to publish to. Can contain ASCII letters, " + "digits, hyphen, underscore, period, or colon." +) # Global variable used to set the exit code in error handlers, then let @@ -266,3 +272,36 @@ def callback(consumer): pass consumer.result.addCallbacks(callback, errback) + + +@cli.command() +@click.option("--exchange", help=_publish_exchange_help) +def publish(exchange): + """ + Send messages from stdin to an AMQP queue. + """ + for line in sys.stdin: + try: + msg = loads(line) + except exceptions.ValidationError as e: + raise click.BadArgumentUsage("Unable to validate message: {}".format(str(e))) + except KeyError as e: + raise click.BadArgumentUsage( + "Unable to create message. Missing attribute {}".format(str(e)) + ) + except ValueError as e: + raise click.BadArgumentUsage( + "Unable to load serialized message: {}".format(str(e)) + ) + + click.echo("Sending message with topic {}".format(msg.topic)) + try: + api.publish(msg, exchange) + except exceptions.PublishReturned as e: + click.echo("Unable to publish message: {}".format(str(e.reason))) + sys.exit(errno.EREMOTEIO) + except exceptions.ConnectionException as e: + click.echo( + "Unable to connect to the message broker: {}".format(str(e.reason)) + ) + sys.exit(errno.ECONNREFUSED) diff --git a/fedora_messaging/tests/unit/test_cli.py b/fedora_messaging/tests/unit/test_cli.py index 810a2d61..8fd875ee 100644 --- a/fedora_messaging/tests/unit/test_cli.py +++ b/fedora_messaging/tests/unit/test_cli.py @@ -19,13 +19,14 @@ import os import unittest +import errno from click.testing import CliRunner from twisted.internet import error from twisted.python import failure import mock -from fedora_messaging import cli, config, exceptions +from fedora_messaging import cli, config, exceptions, message from fedora_messaging.tests import FIXTURES_DIR from fedora_messaging.twisted import consumer @@ -444,3 +445,139 @@ def test_errback_general_exception(self, mock_reactor): cli._consume_errback(f) self.assertEqual(11, cli._exit_code) + + +class PublishCliTests(unittest.TestCase): + """Unit tests for the 'publish' command of the CLI.""" + + def setUp(self): + self.runner = CliRunner() + + @mock.patch("fedora_messaging.cli.api.publish") + def test_correct_msg_in_stdin(self, mock_publish): + """Assert providing correct message json via stdin works.""" + cli_options = {"exchange": "test_pe"} + serialized_msg = ( + '{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_' + 'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10' + ':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"' + ', "topic": "test_topic"}' + ) + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn("Sending message with topic test_topic", result.output) + mock_publish.assert_called_once() + + sent_msg = mock_publish.call_args_list[0][0][0] + + # Prepare expected message + expected_msg = message.Message( + body={"test_key1": "test_value1"}, topic="test_topic", severity=message.INFO + ) + + self.assertEqual(sent_msg.queue, "test_queue") + self.assertEqual(expected_msg, sent_msg) + self.assertEqual(mock_publish.call_args_list[0][0][1], cli_options["exchange"]) + self.assertEqual(0, result.exit_code) + + @mock.patch("fedora_messaging.cli.api.publish") + def test_file_with_corrupted_json(self, mock_publish): + """Assert providing corrupted message json via stdin works.""" + cli_options = {"exchange": "test_pe"} + serialized_msg = "[" + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn("Unable to load serialized message: ", result.output) + mock_publish.assert_not_called() + self.assertEqual(2, result.exit_code) + + @mock.patch("fedora_messaging.cli.api.publish") + def test_file_with_msg_without_id(self, mock_publish): + """Assert providing incorrect message json via stdin works.""" + cli_options = {"exchange": "test_pe"} + serialized_msg = ( + '{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_sc' + 'hema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10:11:' + '41+00:00"}, "queue": "test_queue", "topic": "test_topic"}' + ) + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn("Unable to create message. Missing attribute 'id'", result.output) + mock_publish.assert_not_called() + self.assertEqual(2, result.exit_code) + + @mock.patch("fedora_messaging.cli.api.publish") + def test_file_with_invalid_msg(self, mock_publish): + """Assert providing incorrect message json via stdin works.""" + cli_options = {"exchange": "test_pe"} + serialized_msg = ( + '{"body": [], "headers": {"fedora_messaging_schema": "base.message", "fe' + 'dora_messaging_severity": 21, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": "273ed91' + 'd-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue", "topic": "test_topic"}' + ) + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn( + "Unable to validate message: 21 is not one of [10, 20, 30, 40]\n\nFailed validating " + "'enum' in schema['properties']['fedora_messaging_severity']:\n {'enum': [10, 20, " + "30, 40], 'type': 'number'}\n\nOn instance['fedora_messaging_severity']:\n 21\n", + result.output, + ) + mock_publish.assert_not_called() + self.assertEqual(2, result.exit_code) + + @mock.patch("fedora_messaging.cli.api.publish") + def test_publish_rejected_message(self, mock_publish): + """Assert a rejected message is reported.""" + cli_options = {"exchange": "test_pe"} + error_message = "Message rejected" + serialized_msg = ( + '{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_' + 'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10' + ':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"' + ', "topic": "test_topic"}' + ) + mock_publish.side_effect = exceptions.PublishReturned(error_message) + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn("Unable to publish message: " + error_message, result.output) + mock_publish.assert_called_once() + self.assertEqual(errno.EREMOTEIO, result.exit_code) + + @mock.patch("fedora_messaging.cli.api.publish") + def test_publish_connection_failed(self, mock_publish): + """Assert a connection problem is reported.""" + cli_options = {"exchange": "test_pe"} + error_message = "Connection failure" + mock_publish.side_effect = exceptions.ConnectionException(reason=error_message) + serialized_msg = ( + '{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_' + 'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10' + ':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"' + ', "topic": "test_topic"}' + ) + result = self.runner.invoke( + cli.cli, + ["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]], + input=serialized_msg, + ) + self.assertIn( + "Unable to connect to the message broker: " + error_message, result.output + ) + mock_publish.assert_called_once() + self.assertEqual(errno.ECONNREFUSED, result.exit_code)