Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "consume()" and "cancel()" to the Twisted API #72

Merged
merged 3 commits into from
Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ Exceptions
Twisted
=======

In addition to the synchronous API, a Twisted API is provided for applications
that need an asynchronous API. This API requires Twisted 16.1.0 or greater.

Protocol
--------

.. automodule:: fedora_messaging.twisted.protocol
:members:
:members: FedoraMessagingProtocol, Consumer

Factory
-------
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,5 @@
"jsonschema": ("https://python-jsonschema.readthedocs.io/en/latest/", None),
"six": ("https://pythonhosted.org/six/", None),
"blinker": ("https://pythonhosted.org/blinker/", None),
"Twisted": ("https://twistedmatrix.com/documents/current/api/", None),
}
21 changes: 21 additions & 0 deletions fedora_messaging/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ class BaseException(Exception):
"""The base class for all exceptions raised by fedora_messaging."""


class NoFreeChannels(BaseException):
"""Raised when a connection has reached its channel limit"""


class BadDeclaration(BaseException):
"""
Raised when declaring an object in AMQP fails.

Args:
obj_type (str): The type of object being declared. One of "binding",
"queue", or "exchange".
description (dict): The description of the object.
reason (str): The reason the server gave for rejecting the declaration.
"""

def __init__(self, obj_type, description, reason):
self.obj_type = obj_type
self.description = description
self.reason = reason


class ConfigurationException(BaseException):
"""
Raised when there's an invalid configuration setting
Expand Down
247 changes: 247 additions & 0 deletions fedora_messaging/tests/integration/test_publish_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
import threading
import time
import unittest
import uuid
import socket

from twisted.internet import reactor, task
import mock
import pika
import pkg_resources
import pytest
import pytest_twisted

from fedora_messaging import api, message, exceptions
from fedora_messaging.twisted import service
from fedora_messaging.twisted.protocol import _pika_version


class PubSubTests(unittest.TestCase):
Expand Down Expand Up @@ -61,3 +69,242 @@ def test_pub_connection_refused(self):
api._session_cache.session = api._session.PublisherSession(amqp_url=url)

self.assertRaises(exceptions.ConnectionException, api.publish, api.Message())


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_check_confirms():
"""Assert confirmations are enabled by default."""
serv = service.FedoraMessagingService(amqp_url="amqp://")
serv.startService()
client = yield serv.getFactory().whenConnected()
channel = yield client._allocate_channel()
assert channel._delivery_confirmation is True
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_basic_pub_sub():
"""Basic test of the Twisted publishing/subscribing support"""
queue = str(uuid.uuid4())
queues = [
{"queue": queue, "auto_delete": True, "arguments": {"x-expires": 60 * 1000}}
]
msg = message.Message(
topic=u"nice.message",
headers={u"niceness": u"very"},
body={u"encouragement": u"You're doing great!"},
)
expected_headers = {
u"fedora_messaging_severity": 20,
u"fedora_messaging_schema": u"fedora_messaging.message:Message",
u"niceness": u"very",
}
messages_received = []
serv = service.FedoraMessagingService(amqp_url="amqp://")

serv.startService()
client = yield serv.getFactory().whenConnected()
yield client.declare_queues(queues)
yield client.bind_queues(
[{"queue": queue, "exchange": "amq.topic", "routing_key": "#"}]
)

def callback(message):
messages_received.append(message)
if len(messages_received) == 3:
raise exceptions.HaltConsumer()

yield client.consume(callback, queue)
for _ in range(0, 3):
yield client.publish(msg, "amq.topic")
yield task.deferLater(reactor, 3.0, lambda: True)
serv.stopService()

assert len(messages_received) == 3
for m in messages_received:
assert u"nice.message" == m.topic
assert {u"encouragement": u"You're doing great!"} == m._body
assert "sent-at" in m._headers
del m._headers["sent-at"]
assert expected_headers == m._headers


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_unhandled_exception_cancels_consumer():
"""Assert any unhandled Exception results in the consumer being canceled."""
queue = str(uuid.uuid4())
queues = [
{"queue": queue, "auto_delete": True, "arguments": {"x-expires": 60 * 1000}}
]
serv = service.FedoraMessagingService(amqp_url="amqp://")

serv.startService()
client = yield serv.getFactory().whenConnected()
yield client.declare_queues(queues)
yield client.bind_queues(
[{"queue": queue, "exchange": "amq.topic", "routing_key": "#"}]
)

def callback(message):
raise Exception("Panic!")

yield client.consume(callback, queue)
assert len(client._consumers) == 1

yield client.publish(message.Message(), "amq.topic")
yield task.deferLater(reactor, 3.0, lambda: True)
assert len(client._consumers) == 0
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_nack_handled():
"""Assert raising Nack in a consumer works and messages are re-delivered"""
queue = str(uuid.uuid4())
queues = [
{"queue": queue, "auto_delete": True, "arguments": {"x-expires": 60 * 1000}}
]
messages = []
serv = service.FedoraMessagingService(amqp_url="amqp://")

serv.startService()
client = yield serv.getFactory().whenConnected()
yield client.declare_queues(queues)
yield client.bind_queues(
[{"queue": queue, "exchange": "amq.topic", "routing_key": "#"}]
)

def callback(message):
messages.append(message)
if len(messages) < 3:
raise exceptions.Nack()

yield client.consume(callback, queue)
assert len(client._consumers) == 1

yield client.publish(message.Message(), "amq.topic")
yield task.deferLater(reactor, 3.0, lambda: True)

assert len(messages) == 3
assert len(set([m.id for m in messages])) == 1
assert len(client._consumers) == 1

yield client.cancel(queue)
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_drop_handled():
"""Assert raising Drop in a consumer works and messages are not re-delivered"""
queue = str(uuid.uuid4())
messages = []
serv = service.FedoraMessagingService(amqp_url="amqp://")
serv.startService()
client = yield serv.getFactory().whenConnected()
queues = [
{"queue": queue, "auto_delete": True, "arguments": {"x-expires": 60 * 1000}}
]
yield client.declare_queues(queues)
yield client.bind_queues(
[{"queue": queue, "exchange": "amq.topic", "routing_key": "#"}]
)

def callback(message):
messages.append(message)
raise exceptions.Drop()

yield client.consume(callback, queue)
assert len(client._consumers) == 1

yield client.publish(message.Message(), "amq.topic")
yield task.deferLater(reactor, 3.0, lambda: True) # Just wait a few seconds

assert len(messages) == 1
assert len(client._consumers) == 1
yield client.cancel(queue)
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_declare_queue_failures():
"""Assert that if a queue can't be declared, it results in an exception."""
serv = service.FedoraMessagingService(amqp_url="amqp://")
serv.startService()
client = yield serv.getFactory().whenConnected()

queues = [{"queue": str(uuid.uuid4()), "passive": True}]
try:
yield client.declare_queues(queues)
except exceptions.BadDeclaration as e:
assert "queue" == e.obj_type
assert queues[0] == e.description
assert isinstance(e.reason, pika.exceptions.ChannelClosed)
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_declare_exchange_failures():
"""Assert that if an exchange can't be declared, it results in an exception."""
serv = service.FedoraMessagingService(amqp_url="amqp://")
serv.startService()
client = yield serv.getFactory().whenConnected()

exchanges = [{"exchange": str(uuid.uuid4()), "passive": True}]
try:
yield client.declare_exchanges(exchanges)
except exceptions.BadDeclaration as e:
assert "exchange" == e.obj_type
assert exchanges[0] == e.description
assert isinstance(e.reason, pika.exceptions.ChannelClosed)
serv.stopService()


@pytest.mark.skipif(
_pika_version < pkg_resources.parse_version("1.0.0b1"),
reason="Twisted only supported on pika-1.0.0b1+",
)
@pytest_twisted.inlineCallbacks
def test_declare_binding_failure():
"""Assert that if a binding can't be declared, it results in an exception."""
serv = service.FedoraMessagingService(amqp_url="amqp://")
serv.startService()
client = yield serv.getFactory().whenConnected()

binding = [
{"exchange": str(uuid.uuid4()), "queue": str(uuid.uuid4()), "routing_key": "#"}
]
try:
yield client.bind_queues(binding)
except exceptions.BadDeclaration as e:
assert "binding" == e.obj_type
assert binding[0] == e.description
assert isinstance(e.reason, pika.exceptions.ChannelClosed)
serv.stopService()
Loading