Skip to content

Commit

Permalink
New cli record command
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Wojciechowski <s.wojciechowski89@gmail.com>
  • Loading branch information
sebwoj committed Nov 28, 2018
1 parent 74793c0 commit 0c7f507
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 7 deletions.
45 changes: 41 additions & 4 deletions docs/fedora-messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ Options
Path to a valid configuration file to use in place of the configuration in
``/etc/fedora-messaging/config.toml``.


Commands
========

There are two sub-commands, ``consume`` and ``publish``, described in detail in
their own sections below.
There are three sub-commands, ``consume``, ``publish`` and ``record``, described
in detail in their own sections below.

``fedora-messaging consume [OPTIONS]``

Expand All @@ -43,6 +44,11 @@ their own sections below.
Loads serialized messages from a file and publishes them to the specified
exchange.

``fedora-messaging record [OPTIONS] FILE``

Records messages arrived from AMQP queue and saves them to file with specified
name.


consume
-------
Expand Down Expand Up @@ -75,8 +81,7 @@ consume
``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon. If one is not specified, the
default is the ``amq.topic`` exchange.
digits, hyphen, underscore, period, or colon.


publish
Expand All @@ -88,6 +93,38 @@ publish
digits, hyphen, underscore, period, or colon.


record
------

``--limit``

The maximum number of messages to record.

``--app-name``

The name of the application, used by the AMQP client to identify itself to
the broker. This is purely for administrator convenience to determine what
applications are connected and own particular resources. If not specified,
the default is ``recorder``.

``--routing-key``

The AMQP routing key to use with the queue. This controls what messages are
delivered to the consumer. Can be specified multiple times; any message
that matches at least one will be placed in the message queue.

``--queue-name``

The name of the message queue in AMQP. Can contain ASCII letters, digits,
hyphen, underscore, period, or colon. If one is not specified, a unique
name will be created for you.

``--exchange``

The name of the exchange to bind the queue to. Can contain ASCII letters,
digits, hyphen, underscore, period, or colon.


Help
====

Expand Down
104 changes: 101 additions & 3 deletions fedora_messaging/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import os
import sys
import errno
import signal

import click
import pkg_resources

from . import config, api, exceptions
from .message import loads
from .message import dumps, loads

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,13 +63,13 @@
)
_exchange_help = (
"The name of the exchange to bind the queue to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon. If one is not specified, the "
"default is the ``amq.topic`` exchange."
"digits, hyphen, underscore, period, or colon."
)
_publish_exchange_help = (
"The name of the exchange to publish to. Can contain ASCII letters, "
"digits, hyphen, underscore, period, or colon."
)
_limit_help = "The maximum number of messages to record."


@click.group()
Expand Down Expand Up @@ -196,3 +197,100 @@ def publish(exchange, file):
except exceptions.ConnectionException as e:
click.echo("Unable to connect to the queue: {}".format(str(e.reason)))
sys.exit(errno.ECONNREFUSED)


@cli.command()
@click.argument("file", type=click.File("w"))
@click.option("--limit", help=_limit_help, type=click.IntRange(1))
@click.option("--app-name", help=_app_name_help, default="recorder")
@click.option("--routing-key", help=_routing_key_help, multiple=True)
@click.option("--queue-name", help=_queue_name_help)
@click.option("--exchange", help=_exchange_help)
def record(exchange, queue_name, routing_key, app_name, limit, file):
"""Record messages from an AMQP queue to provided file."""

class Recorder:
"""
A simple callback that records messages.
Attributes:
counter (int): The number of messages this callback has recorded.
messages (list): The list of messages received.
"""

def __init__(self):
self.counter = 0
self.messages = []
if limit:
self.bar = click.progressbar(length=limit)

def _signal_handler(signum, frame):
"""
Signal handler that gracefully terminates the recorder
Args:
signum (int): The signal this process received.
frame (frame): The current stack frame (unused).
"""
if signum in (signal.SIGTERM, signal.SIGINT):
self.save_messages_to_file()
raise exceptions.HaltConsumer(exit_code=0, requeue=False)

signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)

def __call__(self, message):
self.messages.append(message)
self.counter += 1
if limit:
self.bar.update(1)
if limit == self.counter:
self.save_messages_to_file()
raise exceptions.HaltConsumer(exit_code=0, requeue=False)

def save_messages_to_file(self):
try:
json_str = dumps(self.messages)
except TypeError as e:
click.echo("Unable to save messages to file: {}".format(str(e)))
raise exceptions.HaltConsumer(exit_code=1, requeue=False)
else:
file.write(json_str)

if exchange and queue_name and routing_key:
bindings = [
{"exchange": exchange, "queue": queue_name, "routing_keys": routing_key}
]
elif not exchange and not queue_name and not routing_key:
bindings = config.conf["bindings"]
else:
raise click.ClickException(
"You must define all three of exchange, queue_name and"
" routing_key, or none of them to use the configuration"
)
if not bindings:
raise click.ClickException(
"No bindings are defined in the configuration file"
" and none were provided as arguments!"
)

if app_name:
config.conf["client_properties"]["app"] = app_name

click.echo("Recording to file {} started.".format(file))
try:
return api.consume(Recorder, bindings)
except ValueError as e:
click_version = pkg_resources.get_distribution("click").parsed_version
if click_version < pkg_resources.parse_version("7.0"):
raise click.exceptions.BadOptionUsage(str(e))
else:
raise click.exceptions.BadOptionUsage("callback", str(e))
except exceptions.HaltConsumer as e:
if e.exit_code:
_log.error(
"Consumer halted with non-zero exit code (%d): %s",
e.exit_code,
str(e.reason),
)
sys.exit(e.exit_code)

0 comments on commit 0c7f507

Please sign in to comment.