Skip to content

Commit

Permalink
New cli publish command
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Wojciechowski <s.wojciechowski89@gmail.com>
  • Loading branch information
sebwoj committed May 15, 2019
1 parent a783742 commit 7fe5d1b
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 3 deletions.
18 changes: 16 additions & 2 deletions docs/fedora-messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ Options
Commands
========

There is a single sub-command, ``consume``, described in detail in its ow
section below.
There are two sub-commands, ``consume`` and ``publish``, described in detail in
their own sections below.

``fedora-messaging consume [OPTIONS]``

Starts a consumer process with a user-provided callback function to execute
when a message arrives.

``fedora-messaging publish [OPTIONS]``

Loads serialized messages from stdin and publishes them to the specified
exchange.


consume
-------
Expand Down Expand Up @@ -98,6 +103,15 @@ configuration file and no options on the command line.
in *all* ``bindings`` entries in the configuration file.


publish
-------

``--exchange``

The name of the exchange to publish to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon.


Exit codes
==========

Expand Down
39 changes: 39 additions & 0 deletions fedora_messaging/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import logging.config
import os
import sys
import errno

from twisted.python import log as legacy_twisted_log
from twisted.internet import reactor, error
import click
import pkg_resources

from . import config, api, exceptions
from .message import loads

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,6 +68,10 @@
"digits, hyphen, underscore, period, or colon. If one is not specified, the "
"default is the ``amq.topic`` exchange."
)
_publish_exchange_help = (
"The name of the exchange to publish to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon."
)


# Global variable used to set the exit code in error handlers, then let
Expand Down Expand Up @@ -266,3 +272,36 @@ def callback(consumer):
pass

consumer.result.addCallbacks(callback, errback)


@cli.command()
@click.option("--exchange", help=_publish_exchange_help)
def publish(exchange):
"""
Send messages from stdin to an AMQP queue.
"""
for line in sys.stdin:
try:
msg = loads(line)
except exceptions.ValidationError as e:
raise click.BadArgumentUsage("Unable to validate message: {}".format(str(e)))
except KeyError as e:
raise click.BadArgumentUsage(
"Unable to create message. Missing attribute {}".format(str(e))
)
except ValueError as e:
raise click.BadArgumentUsage(
"Unable to load serialized message: {}".format(str(e))
)

click.echo("Sending message with topic {}".format(msg.topic))
try:
api.publish(msg, exchange)
except exceptions.PublishReturned as e:
click.echo("Unable to publish message: {}".format(str(e.reason)))
sys.exit(errno.EREMOTEIO)
except exceptions.ConnectionException as e:
click.echo(
"Unable to connect to the message broker: {}".format(str(e.reason))
)
sys.exit(errno.ECONNREFUSED)
139 changes: 138 additions & 1 deletion fedora_messaging/tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import os
import unittest
import errno

from click.testing import CliRunner
from twisted.internet import error
from twisted.python import failure
import mock

from fedora_messaging import cli, config, exceptions
from fedora_messaging import cli, config, exceptions, message
from fedora_messaging.tests import FIXTURES_DIR
from fedora_messaging.twisted import consumer

Expand Down Expand Up @@ -444,3 +445,139 @@ def test_errback_general_exception(self, mock_reactor):
cli._consume_errback(f)

self.assertEqual(11, cli._exit_code)


class PublishCliTests(unittest.TestCase):
"""Unit tests for the 'publish' command of the CLI."""

def setUp(self):
self.runner = CliRunner()

@mock.patch("fedora_messaging.cli.api.publish")
def test_correct_msg_in_stdin(self, mock_publish):
"""Assert providing correct message json via stdin works."""
cli_options = {"exchange": "test_pe"}
serialized_msg = (
'{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_'
'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10'
':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"'
', "topic": "test_topic"}'
)
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn("Sending message with topic test_topic", result.output)
mock_publish.assert_called_once()

sent_msg = mock_publish.call_args_list[0][0][0]

# Prepare expected message
expected_msg = message.Message(
body={"test_key1": "test_value1"}, topic="test_topic", severity=message.INFO
)

self.assertEqual(sent_msg.queue, "test_queue")
self.assertEqual(expected_msg, sent_msg)
self.assertEqual(mock_publish.call_args_list[0][0][1], cli_options["exchange"])
self.assertEqual(0, result.exit_code)

@mock.patch("fedora_messaging.cli.api.publish")
def test_file_with_corrupted_json(self, mock_publish):
"""Assert providing corrupted message json via stdin works."""
cli_options = {"exchange": "test_pe"}
serialized_msg = "["
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn("Unable to load serialized message: ", result.output)
mock_publish.assert_not_called()
self.assertEqual(2, result.exit_code)

@mock.patch("fedora_messaging.cli.api.publish")
def test_file_with_msg_without_id(self, mock_publish):
"""Assert providing incorrect message json via stdin works."""
cli_options = {"exchange": "test_pe"}
serialized_msg = (
'{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_sc'
'hema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10:11:'
'41+00:00"}, "queue": "test_queue", "topic": "test_topic"}'
)
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn("Unable to create message. Missing attribute 'id'", result.output)
mock_publish.assert_not_called()
self.assertEqual(2, result.exit_code)

@mock.patch("fedora_messaging.cli.api.publish")
def test_file_with_invalid_msg(self, mock_publish):
"""Assert providing incorrect message json via stdin works."""
cli_options = {"exchange": "test_pe"}
serialized_msg = (
'{"body": [], "headers": {"fedora_messaging_schema": "base.message", "fe'
'dora_messaging_severity": 21, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": "273ed91'
'd-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue", "topic": "test_topic"}'
)
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn(
"Unable to validate message: 21 is not one of [10, 20, 30, 40]\n\nFailed validating "
"'enum' in schema['properties']['fedora_messaging_severity']:\n {'enum': [10, 20, "
"30, 40], 'type': 'number'}\n\nOn instance['fedora_messaging_severity']:\n 21\n",
result.output,
)
mock_publish.assert_not_called()
self.assertEqual(2, result.exit_code)

@mock.patch("fedora_messaging.cli.api.publish")
def test_publish_rejected_message(self, mock_publish):
"""Assert a rejected message is reported."""
cli_options = {"exchange": "test_pe"}
error_message = "Message rejected"
serialized_msg = (
'{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_'
'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10'
':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"'
', "topic": "test_topic"}'
)
mock_publish.side_effect = exceptions.PublishReturned(error_message)
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn("Unable to publish message: " + error_message, result.output)
mock_publish.assert_called_once()
self.assertEqual(errno.EREMOTEIO, result.exit_code)

@mock.patch("fedora_messaging.cli.api.publish")
def test_publish_connection_failed(self, mock_publish):
"""Assert a connection problem is reported."""
cli_options = {"exchange": "test_pe"}
error_message = "Connection failure"
mock_publish.side_effect = exceptions.ConnectionException(reason=error_message)
serialized_msg = (
'{"body": {"test_key1": "test_value1"}, "headers": {"fedora_messaging_'
'schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "2018-11-18T10'
':11:41+00:00"}, "id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", "queue": "test_queue"'
', "topic": "test_topic"}'
)
result = self.runner.invoke(
cli.cli,
["--conf=" + GOOD_CONF, "publish", "--exchange=" + cli_options["exchange"]],
input=serialized_msg,
)
self.assertIn(
"Unable to connect to the message broker: " + error_message, result.output
)
mock_publish.assert_called_once()
self.assertEqual(errno.ECONNREFUSED, result.exit_code)

0 comments on commit 7fe5d1b

Please sign in to comment.