Skip to content

tsv1/amqp-mock

Repository files navigation

AMQP Mock

Codecov PyPI PyPI - Downloads Python Version

Installation

pip3 install amqp-mock

Overview

Test Publishing

from amqp_mock import create_amqp_mock

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Publish message via "system under test"
    publish_message([1, 2, 3], "exchange")

    # 3. Test message has been published
    messages = await mock.client.get_exchange_messages("exchange")
    assert messages[0].value == [1, 2, 3]

Full code available here: ./examples/publish_example.py

Test Consuming

from amqp_mock import create_amqp_mock, Message, MessageStatus

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Mock next message
    await mock.client.publish_message("queue", Message([1, 2, 3]))

    # 3. Consume message via "system under test"
    consume_message("queue")

    # 4. Test message has been consumed
    history = await mock.client.get_queue_message_history("queue")
    assert history[0].status == MessageStatus.ACKED

Full code available here: ./examples/consume_example.py

Mock Server

Start server

import asyncio

from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock


async def run() -> None:
    storage = Storage()
    http_server = HttpServer(storage, port=8080)
    amqp_server = AmqpServer(storage, port=5672)
    async with create_amqp_mock(http_server, amqp_server):
        await asyncio.Future()

asyncio.run(run())

or via docker

docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock

Publish message

POST /queues/{queue}/messages

{
    "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
    "value": [1, 2, 3],
    "exchange": "",
    "routing_key": "",
    "properties": null
}
HTTP

$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]' \
    exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient, Message

mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)

Get queue message history

GET /queues/{queue}/messages/history

HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8

[
    {
        "message": {
            "exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]
        },
        "queue": "test_queue",
        "status": "ACKED"
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
#                  queue='test_queue',
#                  status=MessageStatus.ACKED>
# ]

Get exchange messages

GET /exchanges/{exchange}/messages

HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8

[
    {
        "exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {
            "app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""
        },
        "routing_key": "",
        "value": [1, 2, 3]
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]

Delete exchange messages

DELETE /exchanges/{exchange}/messages

HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")

Reset

DELETE /

HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.reset()