From f0181653b3e5e338a81ee226264730b6a9129ab8 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 13 Oct 2021 16:49:25 +0100 Subject: [PATCH 01/29] New zocalo.util.rabbitmq.RabbitMQAPI --- src/zocalo/util/rabbitmq.py | 97 +++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 1b22dda..47d8eee 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,4 +1,9 @@ +import json +import urllib import urllib.request +from typing import Any, Dict, List, Tuple + +from workflows.transport import pika_transport import zocalo.configuration @@ -33,3 +38,95 @@ def http_api_request( opener = urllib.request.build_opener(handler) urllib.request.install_opener(opener) return urllib.request.Request(f"{zc.rabbitmqapi['base_url']}{api_path}") + + +class RabbitMQAPI: + def __init__(self, zc: zocalo.configuration.Configuration): + self._zc = zc + + def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: + # https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.9.7/deps/rabbitmq_management/priv/www/api/index.html + HEALTH_CHECKS = { + "/health/checks/alarms", + "/health/checks/local-alarms", + "/health/checks/certificate-expiration/1/months", + f"/health/checks/port-listener/{pika_transport.PikaTransport.defaults['--rabbit-port']}", + # f"/health/checks/port-listener/1234", + "/health/checks/protocol-listener/amqp", + "/health/checks/virtual-hosts", + "/health/checks/node-is-mirror-sync-critical", + "/health/checks/node-is-quorum-critical", + } + + success = {} + failure = {} + for health_check in HEALTH_CHECKS: + try: + with urllib.request.urlopen( + http_api_request(self._zc, health_check) + ) as response: + success[health_check] = json.loads(response.read()) + except urllib.error.HTTPError as e: + failure[health_check] = str(e) + return success, failure + + @property + def connections(self) -> List[Dict[str, Any]]: + with urllib.request.urlopen( + http_api_request(self._zc, "/connections") + ) as response: + return json.loads(response.read()) + + @property + def nodes(self) -> List[Dict[str, Any]]: + # https://www.rabbitmq.com/monitoring.html#node-metrics + with urllib.request.urlopen(http_api_request(self._zc, "/nodes")) as response: + nodes = json.loads(response.read()) + useful_keys = { + "name", + "mem_used", + "mem_limit", + "mem_alarm", + "disk_free", + "disk_free_limit", + "disk_free_alarm", + "fd_total", + "fd_used", + "io_file_handle_open_attempt_count", + "sockets_total", + "sockets_used", + "message_stats.disk_reads", + "message_stats.disk_writes", + "gc_num", + "gc_bytes_reclaimed", + "proc_total", + "proc_used", + "run_queue", + } + filtered = [ + {k: v for k, v in node.items() if k in useful_keys} for node in nodes + ] + return filtered + + @property + def queues(self) -> List[Dict[str, Any]]: + # https://www.rabbitmq.com/monitoring.html#queue-metrics + with urllib.request.urlopen(http_api_request(self._zc, "/queues")) as response: + nodes = json.loads(response.read()) + useful_keys = { + "consumers", + "name", + "vhost", + "memory", + "messages", + "messages_ready", + "messages_unacknowledged", + "message_stats.publish", + "message_stats.publish_details.rate", + "message_stats.deliver_get", + "message_stats.deliver_get_details.rate", + } + filtered = [ + {k: v for k, v in node.items() if k in useful_keys} for node in nodes + ] + return filtered From 61e1fa80b1a23b0caefe07cba236907d42f49fb9 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Thu, 14 Oct 2021 11:32:30 +0100 Subject: [PATCH 02/29] Method -> property --- src/zocalo/util/rabbitmq.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 47d8eee..045b8d9 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -44,6 +44,7 @@ class RabbitMQAPI: def __init__(self, zc: zocalo.configuration.Configuration): self._zc = zc + @property def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: # https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.9.7/deps/rabbitmq_management/priv/www/api/index.html HEALTH_CHECKS = { From 2c37a48f451db4b78592a4abb5ebbfbc73513ce5 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Thu, 14 Oct 2021 11:33:54 +0100 Subject: [PATCH 03/29] history --- HISTORY.rst | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index fc4a375..e54d24a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,13 +5,16 @@ History Unreleased ---------- * Add command line tools for handling dead-letter messages -* ``zocalo.dlq_check`` checks dead-letter queues for messages -* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory - specified in the Zocalo configuration -* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and +* ``zocalo.dlq_check`` checks dead-letter queues for messages +* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory + specified in the Zocalo configuration +* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and places it back on a queue -* Use ``argparse`` for all command line tools and make use of ``workflows`` transport +* Use ``argparse`` for all command line tools and make use of ``workflows`` transport +* Use ``argparse`` for all command line tools and make use of ``workflows`` transport argument injection. Minimum ``workflows`` version is now 2.14 +* New ``zocalo.util.rabbitmq.RabbitMQAPI()`` providing a thin wrapper around the + RabbitMQ HTTP API 0.10.0 (2021-10-04) ------------------- From 1013cdbf8a57b4363bdeddb71a61e5cc0307d79a Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Thu, 14 Oct 2021 13:43:20 +0100 Subject: [PATCH 04/29] Use RabbitMQAPI in dlq_check --- src/zocalo/cli/dlq_check.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/zocalo/cli/dlq_check.py b/src/zocalo/cli/dlq_check.py index 127b9b0..42fbe23 100644 --- a/src/zocalo/cli/dlq_check.py +++ b/src/zocalo/cli/dlq_check.py @@ -1,12 +1,10 @@ import argparse -import json -import urllib import workflows.transport import zocalo.configuration from zocalo.util.jmxstats import JMXAPI -from zocalo.util.rabbitmq import http_api_request +from zocalo.util.rabbitmq import RabbitMQAPI # # zocalo.dlq_check @@ -50,16 +48,14 @@ def extract_queue_name(namestring): def check_dlq_rabbitmq( zc: zocalo.configuration.Configuration, namespace: str = None ) -> dict: - _api_request = http_api_request(zc, "/queues") - with urllib.request.urlopen(_api_request) as response: - reply = response.read() - queue_info = json.loads(reply) - dlq_info = {} - for q in queue_info: - if q["name"].startswith("dlq."): - if (namespace is None or q["vhost"] == namespace) and int(q["messages"]): - dlq_info[q["name"]] = int(q["messages"]) - return dlq_info + rmq = RabbitMQAPI(zc) + return { + q["name"]: int(q["messages"]) + for q in rmq.queues + if q["name"].startswith("dlq.") + and (namespace is None or q["vhost"] == namespace) + and int(q["messages"]) + } def run() -> None: From 7dcc637472f657afe471134dc99ee8e49caa120d Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Thu, 14 Oct 2021 17:03:20 +0100 Subject: [PATCH 05/29] Fix test --- tests/cli/test_dlq_check.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cli/test_dlq_check.py b/tests/cli/test_dlq_check.py index f162889..f38138d 100644 --- a/tests/cli/test_dlq_check.py +++ b/tests/cli/test_dlq_check.py @@ -21,8 +21,8 @@ def test_activemq_dlq_check(mock_jmx): assert checked == {"images": 2, "transient": 5} -@mock.patch("zocalo.cli.dlq_check.urllib.request.urlopen") -@mock.patch("zocalo.cli.dlq_check.http_api_request") +@mock.patch("zocalo.util.rabbitmq.urllib.request.urlopen") +@mock.patch("zocalo.util.rabbitmq.http_api_request") def test_activemq_dlq_rabbitmq_check(mock_api, mock_url): cfg = Configuration({}) _mock = mock.MagicMock() From dfe8034ef11121a38be3bf766da93bf6ab49ea30 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 18 Oct 2021 10:39:25 +0100 Subject: [PATCH 06/29] Remove broken attempt at filtering useful keys --- src/zocalo/util/rabbitmq.py | 46 +-------------- tests/util/test_rabbitmq.py | 110 +++++++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 47 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 045b8d9..cd15901 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -82,52 +82,10 @@ def connections(self) -> List[Dict[str, Any]]: def nodes(self) -> List[Dict[str, Any]]: # https://www.rabbitmq.com/monitoring.html#node-metrics with urllib.request.urlopen(http_api_request(self._zc, "/nodes")) as response: - nodes = json.loads(response.read()) - useful_keys = { - "name", - "mem_used", - "mem_limit", - "mem_alarm", - "disk_free", - "disk_free_limit", - "disk_free_alarm", - "fd_total", - "fd_used", - "io_file_handle_open_attempt_count", - "sockets_total", - "sockets_used", - "message_stats.disk_reads", - "message_stats.disk_writes", - "gc_num", - "gc_bytes_reclaimed", - "proc_total", - "proc_used", - "run_queue", - } - filtered = [ - {k: v for k, v in node.items() if k in useful_keys} for node in nodes - ] - return filtered + return json.loads(response.read()) @property def queues(self) -> List[Dict[str, Any]]: # https://www.rabbitmq.com/monitoring.html#queue-metrics with urllib.request.urlopen(http_api_request(self._zc, "/queues")) as response: - nodes = json.loads(response.read()) - useful_keys = { - "consumers", - "name", - "vhost", - "memory", - "messages", - "messages_ready", - "messages_unacknowledged", - "message_stats.publish", - "message_stats.publish_details.rate", - "message_stats.deliver_get", - "message_stats.deliver_get_details.rate", - } - filtered = [ - {k: v for k, v in node.items() if k in useful_keys} for node in nodes - ] - return filtered + return json.loads(response.read()) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 6a3718b..3b811fa 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -1,13 +1,117 @@ +import json +import urllib.request + +import pytest + import zocalo.configuration -from zocalo.util.rabbitmq import http_api_request +from zocalo.util.rabbitmq import RabbitMQAPI, http_api_request -def test_http_api_request(mocker): +@pytest.fixture +def zocalo_configuration(mocker): zc = mocker.MagicMock(zocalo.configuration.Configuration) zc.rabbitmqapi = { "base_url": "http://rabbitmq.burrow.com:12345/api", "username": "carrots", "password": "carrots", } - request = http_api_request(zc, api_path="/queues") + return zc + + +def test_http_api_request(zocalo_configuration): + request = http_api_request(zocalo_configuration, api_path="/queues") assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues" + + +def test_api_health_checks(mocker, zocalo_configuration): + mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") + mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") + mock_api.return_value = "" + mock_url.return_value = mocker.MagicMock() + mock_url.return_value.__enter__.return_value.read.return_value = json.dumps( + {"status": "ok"} + ) + rmq = RabbitMQAPI(zocalo_configuration) + success, failures = rmq.health_checks + assert not failures + assert success + for k, v in success.items(): + assert k.startswith("/health/checks/") + assert v == {"status": "ok"} + + +def test_api_health_checks_failures(mocker, zocalo_configuration): + mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") + mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") + mock_api.return_value = "" + mock_url.return_value = mocker.MagicMock() + mock_url.return_value.__enter__.return_value.read.side_effect = ( + urllib.error.HTTPError( + "http://foo.com", 503, "Service Unavailable", mocker.Mock(), mocker.Mock() + ) + ) + rmq = RabbitMQAPI(zocalo_configuration) + success, failures = rmq.health_checks + assert failures + assert not success + for k, v in success.items(): + assert k.startswith("/health/checks/") + assert v == "HTTP Error 503: Service Unavailable" + + +def test_api_queues(mocker, zocalo_configuration): + queues = [ + { + "consumers": 0, + "memory": 110112, + "message_stats": { + "deliver_get": 33, + "deliver_get_details": {"rate": 0}, + "publish": 22, + "publish_details": {"rate": 0}, + }, + "messages": 0, + "messages_ready": 0, + "messages_unacknowledged": 0, + "name": "foo", + "vhost": "zocalo", + }, + ] + + mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") + mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") + mock_api.return_value = "" + mock_url.return_value = mocker.MagicMock() + mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(queues) + rmq = RabbitMQAPI(zocalo_configuration) + assert rmq.queues == queues + + +def test_api_nodes(mocker, zocalo_configuration): + nodes = { + "name": "rabbit@pooter123", + "mem_limit": 80861855744, + "mem_alarm": False, + "mem_used": 143544320, + "disk_free_limit": 50000000, + "disk_free_alarm": False, + "disk_free": 875837644800, + "fd_total": 32768, + "fd_used": 56, + "io_file_handle_open_attempt_count": 647, + "sockets_total": 29401, + "sockets_used": 0, + "gc_num": 153378077, + "gc_bytes_reclaimed": 7998215046336, + "proc_total": 1048576, + "proc_used": 590, + "run_queue": 1, + } + + mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") + mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") + mock_api.return_value = "" + mock_url.return_value = mocker.MagicMock() + mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(nodes) + rmq = RabbitMQAPI(zocalo_configuration) + assert rmq.queues == nodes From 212c54c90bf2021e84b09261cd5d84c9414e3f96 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Fri, 22 Oct 2021 18:24:07 +0100 Subject: [PATCH 07/29] WIP advanced API --- .pre-commit-config.yaml | 1 + src/zocalo/util/rabbitmq.py | 385 +++++++++++++++++++++++++++++++++--- 2 files changed, 355 insertions(+), 31 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 93bef9c..862e715 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,3 +37,4 @@ repos: hooks: - id: mypy files: 'src/.*\.py$' + additional_dependencies: [types-requests] diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index cd15901..3d909c3 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,12 +1,233 @@ -import json +import datetime +import enum +import logging import urllib import urllib.request -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union +import requests +from pydantic import BaseModel, Field from workflows.transport import pika_transport import zocalo.configuration +logger = logging.getLogger("workflows.transport.pika_transport") + + +class MessageStats(BaseModel): + publish: Optional[int] = Field(None, description="Count of messages published.") + + publish_in: Optional[int] = Field( + None, + description='Count of messages published "in" to an exchange, i.e. not taking account of routing.', + ) + publish_out: Optional[int] = Field( + None, + description='Count of messages published "out" of an exchange, i.e. taking account of routing.', + ) + confirm: Optional[int] = Field(None, description="Count of messages confirmed.") + deliver: Optional[int] = Field( + None, + description="Count of messages delivered in acknowledgement mode to consumers.", + ) + deliver_no_ack: Optional[int] = Field( + None, + description="Count of messages delivered in no-acknowledgement mode to consumers.", + ) + get: Optional[int] = Field( + None, + description="Count of messages delivered in acknowledgement mode in response to basic.get.", + ) + get_no_ack: Optional[int] = Field( + None, + description="Count of messages delivered in no-acknowledgement mode in response to basic.get.", + ) + deliver_get: Optional[int] = Field( + None, description="Sum of all four of the above." + ) + redeliver: Optional[int] = Field( + None, + description="Count of subset of messages in deliver_get which had the redelivered flag set.", + ) + drop_unroutable: Optional[int] = Field( + None, description="Count of messages dropped as unroutable." + ) + return_unroutable: Optional[int] = Field( + None, description="Count of messages returned to the publisher as unroutable." + ) + + +class ExchangeType(enum.Enum): + direct = "direct" + topic = "topic" + headers = "headers" + fanout = "fanout" + + +class ExchangeSpec(BaseModel): + name: str = Field( + ..., + description="The name of the exchange with non-ASCII characters escaped as in C.", + ) + type: ExchangeType = Field(..., description="The exchange type") + durable: Optional[bool] = Field( + False, description="Whether or not the exchange survives server restarts." + ) + auto_delete: Optional[bool] = Field( + False, + description="Whether the exchange will be deleted automatically when no longer used.", + ) + internal: Optional[bool] = Field( + False, + description="Whether the exchange is internal, i.e. cannot be directly published to by a client.", + ) + arguments: dict[str, Any] = Field(..., description="Exchange arguments.") + + +class ExchangeInfo(ExchangeSpec): + policy: Optional[str] = Field( + None, description="Policy name for applying to the exchange." + ) + message_stats: Optional[MessageStats] = None + incoming: Optional[Dict] = Field( + None, + description="Detailed message stats (see section above) for publishes from channels into this exchange.", + ) + outgoing: Optional[Dict] = Field( + None, + description="Detailed message stats for publishes from this exchange into queues.", + ) + + +class QueueState(str, enum.Enum): + 'The state of the queue. Normally "running", but may be "{syncing, message_count}" if the queue is synchronising.' + + running = "running" + syncing = "syncing" + message_count = "message_count" + + +class QueueSpec(BaseModel): + name: str = Field( + ..., + description="The name of the queue with non-ASCII characters escaped as in C.", + ) + durable: Optional[bool] = Field( + False, description="Whether or not the queue survives server restarts." + ) + auto_delete: Optional[bool] = Field( + False, + description="Whether the queue will be deleted automatically when no longer used.", + ) + arguments: Optional[dict[str, Any]] = Field(None, description="Queue arguments.") + + +class QueueInfo(QueueSpec): + policy: Optional[str] = Field( + None, description="Effective policy name for the queue." + ) + pid: Optional[int] = Field( + None, description="Erlang process identifier of the queue." + ) + owner_pid: Optional[int] = Field( + None, + description="Id of the Erlang process of the connection which is the exclusive owner of the queue. Empty if the queue is non-exclusive.", + ) + exclusive: bool = Field( + ..., + description="True if queue is exclusive (i.e. has owner_pid), false otherwise.", + ) + exclusive_consumer_pid: Optional[int] = Field( + None, + description="Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.", + ) + exclusive_consumer_tag: Optional[str] = Field( + None, + description="Consumer tag of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.", + ) + messages_ready: Optional[int] = Field( + None, description="Number of messages ready to be delivered to clients." + ) + messages_unacknowledged: Optional[int] = Field( + None, + description="Number of messages delivered to clients but not yet acknowledged.", + ) + messages: Optional[int] = Field( + None, description="Sum of ready and unacknowledged messages (queue depth)." + ) + messages_ready_ram: Optional[int] = Field( + None, + description="Number of messages from messages_ready which are resident in ram.", + ) + messages_unacknowledged_ram: Optional[int] = Field( + None, + description="Number of messages from messages_unacknowledged which are resident in ram.", + ) + messages_ram: Optional[int] = Field( + None, description="Total number of messages which are resident in ram." + ) + messages_persistent: Optional[int] = Field( + None, + description="Total number of persistent messages in the queue (will always be 0 for transient queues).", + ) + message_bytes: Optional[int] = Field( + None, + description="Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.", + ) + message_bytes_ready: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages ready to be delivered to clients.", + ) + message_bytes_unacknowledged: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages delivered to clients but not yet acknowledged.", + ) + message_bytes_ram: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages which are currently held in RAM.", + ) + message_bytes_persistent: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages which are persistent.", + ) + head_message_timestamp: Optional[datetime.datetime] = Field( + None, + description="The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", + ) + disk_reads: Optional[int] = Field( + None, + description="Total number of times messages have been read from disk by this queue since it started.", + ) + disk_writes: Optional[int] = Field( + None, + description="Total number of times messages have been written to disk by this queue since it started.", + ) + consumers: Optional[int] = Field(None, description="Number of consumers.") + consumer_utilisation: Optional[float] = Field( + None, + ge=0, + le=1, + description="Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", + ) + memory: Optional[int] = Field( + None, + description="Bytes of memory allocated by the runtime for the queue, including stack, heap and internal structures.", + ) + state: Optional[QueueState] = None + message_stats: Optional[MessageStats] = None + incoming: Optional[dict] = Field( + None, + description="Detailed message stats (see section above) for publishes from exchanges into this queue.", + ) + deliveries: Optional[dict] = Field( + None, + description="Detailed message stats for deliveries from this queue into channels.", + ) + consumer_details: Optional[List[Any]] = Field( + None, + description="List of consumers on this channel, with some details on each.", + ) + def http_api_request( zc: zocalo.configuration.Configuration, @@ -41,51 +262,153 @@ def http_api_request( class RabbitMQAPI: - def __init__(self, zc: zocalo.configuration.Configuration): - self._zc = zc + def __init__(self, url: str, user: str, password: str): + self._auth = (user, password) + self._url = url + + @classmethod + def from_zocalo_configuration(cls, zc: zocalo.configuration.Configuration): + return cls( + url=zc.rabbitmqapi["base_url"], + user=zc.rabbitmqapi["username"], + password=zc.rabbitmqapi["password"], + ) @property def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: # https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.9.7/deps/rabbitmq_management/priv/www/api/index.html HEALTH_CHECKS = { - "/health/checks/alarms", - "/health/checks/local-alarms", - "/health/checks/certificate-expiration/1/months", - f"/health/checks/port-listener/{pika_transport.PikaTransport.defaults['--rabbit-port']}", - # f"/health/checks/port-listener/1234", - "/health/checks/protocol-listener/amqp", - "/health/checks/virtual-hosts", - "/health/checks/node-is-mirror-sync-critical", - "/health/checks/node-is-quorum-critical", + "health/checks/alarms", + "health/checks/local-alarms", + "health/checks/certificate-expiration/1/months", + f"health/checks/port-listener/{pika_transport.PikaTransport.defaults['--rabbit-port']}", + # f"health/checks/port-listener/1234", + "health/checks/protocol-listener/amqp", + "health/checks/virtual-hosts", + "health/checks/node-is-mirror-sync-critical", + "health/checks/node-is-quorum-critical", } success = {} failure = {} for health_check in HEALTH_CHECKS: - try: - with urllib.request.urlopen( - http_api_request(self._zc, health_check) - ) as response: - success[health_check] = json.loads(response.read()) - except urllib.error.HTTPError as e: - failure[health_check] = str(e) + response = self._get(health_check) + if response.status_code == requests.codes.ok: + success[health_check] = response.json() + else: + failure[health_check] = response.text return success, failure @property def connections(self) -> List[Dict[str, Any]]: - with urllib.request.urlopen( - http_api_request(self._zc, "/connections") - ) as response: - return json.loads(response.read()) + return self._get("connections").json() @property def nodes(self) -> List[Dict[str, Any]]: # https://www.rabbitmq.com/monitoring.html#node-metrics - with urllib.request.urlopen(http_api_request(self._zc, "/nodes")) as response: - return json.loads(response.read()) + return self._get("nodes").json() - @property - def queues(self) -> List[Dict[str, Any]]: - # https://www.rabbitmq.com/monitoring.html#queue-metrics - with urllib.request.urlopen(http_api_request(self._zc, "/queues")) as response: - return json.loads(response.read()) + def _get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: + return requests.get(f"{self._url}/{endpoint}", auth=self._auth, params=params) + + def _put( + self, endpoint: str, params: Dict[str, Any] = None, json: Dict[str, Any] = None + ) -> requests.Response: + return requests.put( + f"{self._url}/{endpoint}", auth=self._auth, params=params, json=json + ) + + def _delete( + self, endpoint: str, params: Dict[str, Any] = None + ) -> requests.Response: + return requests.delete( + f"{self._url}/{endpoint}", auth=self._auth, params=params + ) + + def exchanges( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[ExchangeInfo], ExchangeInfo]: + endpoint = "exchanges" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}/" + response = self._get(endpoint) + print(response.url) + print(response) + return ExchangeInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}/" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self._get(endpoint) + logger.debug(response) + return [ExchangeInfo(**qi) for qi in response.json()] + + def exchange_declare(self, vhost: str, exchange: ExchangeSpec): + endpoint = f"exchanges/{vhost}/{exchange.name}" + response = self._put( + endpoint, json=exchange.dict(exclude_defaults=True, exclude={"name"}) + ) + logger.debug(response) + + def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): + endpoint = f"exchanges/{vhost}/{name}" + response = self._delete(endpoint) + logger.debug(response) + + def queues( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[QueueInfo], QueueInfo]: + endpoint = "queues" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}/" + response = self._get(endpoint) + return QueueInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}/" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self._get(endpoint) + # print(response.url) + logger.debug(response) + return [QueueInfo(**qi) for qi in response.json()] + + def queue_declare(self, vhost: str, queue: QueueSpec): + endpoint = f"queues/{vhost}/{queue.name}" + response = self._put( + endpoint, json=queue.dict(exclude_defaults=True, exclude={"name"}) + ) + logger.debug(response) + + def queue_delete( + self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False + ): + endpoint = f"queues/{vhost}/{name}" + response = self._delete(endpoint) + logger.debug(response) + + +if __name__ == "__main__": + import time + + zc = zocalo.configuration.from_file() + zc.activate() + rmq = RabbitMQAPI.from_zocalo_configuration(zc) + print(rmq.queues()) + print(rmq.queues(vhost="zocalo", name="processing_recipe")) + # time.sleep(5) + rmq.queue_declare( + vhost="zocalo", + queue=QueueSpec( + name="foo", auto_delete=True, arguments={"x-single-active-consumer": True} + ), + ) + time.sleep(5) + print(rmq.queues(vhost="zocalo", name="foo")) + rmq.queue_delete(vhost="zocalo", name="foo") + # print(rmq.queues(vhost="zocalo", name="foo")) + for q in rmq.queues(): + print(q.message_stats) + print() + for ex in rmq.exchanges(): + print(ex) + print(rmq.exchanges(vhost="zocalo", name="")) From 0201dfb2205892442ac03fb869f93e7006c61456 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Fri, 22 Oct 2021 20:58:42 +0100 Subject: [PATCH 08/29] nodes and connections --- src/zocalo/util/rabbitmq.py | 270 ++++++++++++++++++++++++++++++++++-- 1 file changed, 259 insertions(+), 11 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 3d909c3..a576e6f 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,6 +1,7 @@ import datetime import enum import logging +import pathlib import urllib import urllib.request from typing import Any, Dict, List, Optional, Tuple, Union @@ -57,6 +58,235 @@ class MessageStats(BaseModel): ) +class ConnectionState(enum.Enum): + starting = "starting" + tuning = "tuning" + opening = "opening" + running = "running" + flow = "flow" + blocking = "blocking" + blocked = "blocked" + closing = "closing" + closed = "closed" + + +class ConnectionInfo(BaseModel): + """TCP/IP connection statistics.""" + + pid: Optional[int] = Field( + int, description="Id of the Erlang process associated with the connection." + ) + name: str = Field(..., description="Readable name for the connection.") + port: int = Field(..., description="Server port.") + host: str = Field( + ..., + description="Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.", + ) + peer_port: int = Field(..., description="Peer port.") + peer_host: str = Field( + ..., + description="Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.", + ) + ssl: bool = Field( + ..., + description="Boolean indicating whether the connection is secured with SSL.", + ) + ssl_protocol: Optional[str] = Field( + None, description='SSL protocol (e.g. "tlsv1").' + ) + ssl_key_exchange: Optional[str] = Field( + None, description='SSL key exchange algorithm (e.g. "rsa").' + ) + ssl_cipher: Optional[str] = Field( + None, description='SSL cipher algorithm (e.g. "aes_256_cbc").' + ) + ssl_hash: Optional[str] = Field(None, description='SSL hash function (e.g. "sha").') + peer_cert_subject: Optional[str] = Field( + None, description="The subject of the peer's SSL certificate, in RFC4514 form." + ) + peer_cert_issuer: Optional[str] = Field( + None, description="The issuer of the peer's SSL certificate, in RFC4514 form." + ) + peer_cert_validity: Optional[str] = Field( + None, description="The period for which the peer's SSL certificate is valid." + ) + state: ConnectionState + channels: int = Field(..., description="Number of channels using the connection.") + protocol: str = Field( + ..., + description="Version of the AMQP protocol in use; currently one of: {0,9,1} {0,8,0}", + ) + auth_mechanism: str = Field( + ..., description='SASL authentication mechanism used, such as "PLAIN".' + ) + user: str = Field(..., description="Username associated with the connection.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + timeout: int = Field( + ..., + description="Connection timeout / negotiated heartbeat interval, in seconds.", + ) + frame_max: int = Field(..., description="Maximum frame size (bytes).") + channel_max: int = Field( + ..., description="Maximum number of channels on this connection." + ) + # client_properties + # Informational properties transmitted by the client during connection establishment. + # recv_oct: + # Octets received. + # recv_cnt + # Packets received. + # send_oct + # Octets send. + # send_cnt + # Packets sent. + # send_pend + # Send queue size. + connected_at: datetime.datetime = Field( + ..., description="Date and time this connection was established, as timestamp." + ) + + +class NodeType(enum.Enum): + disc = "disc" + ram = "ram" + + +class NodeInfo(BaseModel): + # applications List of all Erlang applications running on the node. + # auth_mechanisms List of all SASL authentication mechanisms installed on the node. + # cluster_links A list of the other nodes in the cluster. For each node, there are details of the TCP connection used to connect to it and statistics on data that has been transferred. + config_files: List[pathlib.Path] = Field( + ..., description="List of config files read by the node." + ) + # contexts List of all HTTP listeners on the node. + db_dir: pathlib.Path = Field( + ..., description="Location of the persistent storage used by the node." + ) + disk_free: int = Field(..., description="Disk free space in bytes.") + disk_free_alarm: bool = Field( + ..., description="Whether the disk alarm has gone off." + ) + disk_free_limit: int = Field( + ..., description="Point at which the disk alarm will go off." + ) + enabled_plugins: List[str] = Field( + ..., + description="List of plugins which are both explicitly enabled and running.", + ) + # exchange_types Exchange types available on the node. + fd_total: int = Field(..., description="File descriptors available.") + fd_used: int = Field(..., description="Used file descriptors.") + io_read_avg_time: float = Field( + ..., + ge=0, + description="Average wall time (milliseconds) for each disk read operation in the last statistics interval.", + ) + io_read_bytes: int = Field( + ..., description="Total number of bytes read from disk by the persister." + ) + io_read_count: int = Field( + ..., description="Total number of read operations by the persister." + ) + io_reopen_count: int = Field( + ..., + description="Total number of times the persister has needed to recycle file handles between queues. In an ideal world this number will be zero; if the number is large, performance might be improved by increasing the number of file handles available to RabbitMQ.", + ) + io_seek_avg_time: int = Field( + ..., + description="Average wall time (milliseconds) for each seek operation in the last statistics interval.", + ) + io_seek_count: int = Field( + ..., description="Total number of seek operations by the persister." + ) + io_sync_avg_time: int = Field( + ..., + description="Average wall time (milliseconds) for each fsync() operation in the last statistics interval.", + ) + io_sync_count: int = Field( + ..., description="Total number of fsync() operations by the persister." + ) + io_write_avg_time: int = Field( + ..., + description="Average wall time (milliseconds) for each disk write operation in the last statistics interval.", + ) + io_write_bytes: int = Field( + ..., description="Total number of bytes written to disk by the persister." + ) + io_write_count: int = Field( + ..., description="Total number of write operations by the persister." + ) + log_files: List[pathlib.Path] = Field( + ..., + description='List of log files used by the node. If the node also sends messages to stdout, "" is also reported in the list.', + ) + mem_used: int = Field(..., description="Memory used in bytes.") + mem_alarm: bool = Field(..., description="Whether the memory alarm has gone off.") + mem_limit: int = Field( + ..., description="Point at which the memory alarm will go off." + ) + mnesia_disk_tx_count: int = Field( + ..., + description="Number of Mnesia transactions which have been performed that required writes to disk. (e.g. creating a durable queue). Only transactions which originated on this node are included.", + ) + mnesia_ram_tx_count: int = Field( + ..., + description="Number of Mnesia transactions which have been performed that did not require writes to disk. (e.g. creating a transient queue). Only transactions which originated on this node are included.", + ) + msg_store_read_count: int = Field( + ..., + description="Number of messages which have been read from the message store.", + ) + msg_store_write_count: int = Field( + ..., + description="Number of messages which have been written to the message store.", + ) + name: str = Field(..., description="Node name.") + net_ticktime: int = Field( + ..., description="Current kernel net_ticktime setting for the node." + ) + os_pid: int = Field( + ..., + description="Process identifier for the Operating System under which this node is running.", + ) + # partitions List of network partitions this node is seeing. + proc_total: int = Field(..., description="Maximum number of Erlang processes.") + proc_used: int = Field(..., description="Number of Erlang processes in use.") + processors: int = Field( + ..., description="Number of cores detected and usable by Erlang." + ) + queue_index_journal_write_count: int = Field( + ..., + description="Number of records written to the queue index journal. Each record represents a message being published to a queue, being delivered from a queue, and being acknowledged in a queue.", + ) + queue_index_read_count: int = Field( + ..., description="Number of records read from the queue index." + ) + queue_index_write_count: int = Field( + ..., description="Number of records written to the queue index." + ) + # rates_mode: 'none', 'basic' or 'detailed'. + run_queue: float = Field( + ..., description="Average number of Erlang processes waiting to run." + ) + running: bool = Field( + ..., + description="Boolean for whether this node is up. Obviously if this is false, most other stats will be missing.", + ) + # sasl_log_file Location of sasl log file. + sockets_total: int = Field( + ..., description="File descriptors available for use as sockets." + ) + sockets_used: int = Field(..., description="File descriptors used as sockets.") + type: NodeType + uptime: int = Field( + ..., description="Time since the Erlang VM started, in milliseconds." + ) + # memory Detailed memory use statistics. Only appears if ?memory=true is appended to the URL. + # binary Detailed breakdown of the owners of binary memory. Only appears if ?binary=true is appended to the URL. Note that this can be an expensive query if there are many small binaries in the system. + + class ExchangeType(enum.Enum): direct = "direct" topic = "topic" @@ -299,15 +529,6 @@ def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: failure[health_check] = response.text return success, failure - @property - def connections(self) -> List[Dict[str, Any]]: - return self._get("connections").json() - - @property - def nodes(self) -> List[Dict[str, Any]]: - # https://www.rabbitmq.com/monitoring.html#node-metrics - return self._get("nodes").json() - def _get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: return requests.get(f"{self._url}/{endpoint}", auth=self._auth, params=params) @@ -325,6 +546,29 @@ def _delete( f"{self._url}/{endpoint}", auth=self._auth, params=params ) + def connections( + self, name: Optional[str] = None + ) -> Union[List[ConnectionInfo], ConnectionInfo]: + endpoint = "connections" + if name is not None: + endpoint = f"{endpoint}/{name}/" + response = self._get(endpoint) + return ConnectionInfo(**response.json()) + response = self._get(endpoint) + logger.debug(response) + return [ConnectionInfo(**qi) for qi in response.json()] + + def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: + # https://www.rabbitmq.com/monitoring.html#node-metrics + endpoint = "nodes" + if name is not None: + endpoint = f"{endpoint}/{name}/" + response = self._get(endpoint) + return NodeInfo(**response.json()) + response = self._get(endpoint) + logger.debug(response) + return [NodeInfo(**qi) for qi in response.json()] + def exchanges( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[ExchangeInfo], ExchangeInfo]: @@ -332,8 +576,6 @@ def exchanges( if vhost is not None and name is not None: endpoint = f"{endpoint}/{vhost}/{name}/" response = self._get(endpoint) - print(response.url) - print(response) return ExchangeInfo(**response.json()) elif vhost is not None: endpoint = f"{endpoint}/{vhost}/" @@ -412,3 +654,9 @@ def queue_delete( for ex in rmq.exchanges(): print(ex) print(rmq.exchanges(vhost="zocalo", name="")) + + nodes = rmq.nodes() + print(rmq.nodes(name=nodes[0].name)) + + connections = rmq.connections() + print(rmq.connections(name=connections[0].name)) From 8bab661acb1f87a1e50b993dbb83706f42a2211c Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 10:55:07 +0100 Subject: [PATCH 09/29] Remove underscore prefix --- src/zocalo/util/rabbitmq.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index a576e6f..a5346c7 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -522,26 +522,24 @@ def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: success = {} failure = {} for health_check in HEALTH_CHECKS: - response = self._get(health_check) + response = self.get(health_check) if response.status_code == requests.codes.ok: success[health_check] = response.json() else: failure[health_check] = response.text return success, failure - def _get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: + def get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: return requests.get(f"{self._url}/{endpoint}", auth=self._auth, params=params) - def _put( + def put( self, endpoint: str, params: Dict[str, Any] = None, json: Dict[str, Any] = None ) -> requests.Response: return requests.put( f"{self._url}/{endpoint}", auth=self._auth, params=params, json=json ) - def _delete( - self, endpoint: str, params: Dict[str, Any] = None - ) -> requests.Response: + def delete(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: return requests.delete( f"{self._url}/{endpoint}", auth=self._auth, params=params ) @@ -552,9 +550,9 @@ def connections( endpoint = "connections" if name is not None: endpoint = f"{endpoint}/{name}/" - response = self._get(endpoint) + response = self.get(endpoint) return ConnectionInfo(**response.json()) - response = self._get(endpoint) + response = self.get(endpoint) logger.debug(response) return [ConnectionInfo(**qi) for qi in response.json()] @@ -563,9 +561,9 @@ def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: endpoint = "nodes" if name is not None: endpoint = f"{endpoint}/{name}/" - response = self._get(endpoint) + response = self.get(endpoint) return NodeInfo(**response.json()) - response = self._get(endpoint) + response = self.get(endpoint) logger.debug(response) return [NodeInfo(**qi) for qi in response.json()] @@ -575,26 +573,26 @@ def exchanges( endpoint = "exchanges" if vhost is not None and name is not None: endpoint = f"{endpoint}/{vhost}/{name}/" - response = self._get(endpoint) + response = self.get(endpoint) return ExchangeInfo(**response.json()) elif vhost is not None: endpoint = f"{endpoint}/{vhost}/" elif name is not None: raise ValueError("name can not be set without vhost") - response = self._get(endpoint) + response = self.get(endpoint) logger.debug(response) return [ExchangeInfo(**qi) for qi in response.json()] def exchange_declare(self, vhost: str, exchange: ExchangeSpec): endpoint = f"exchanges/{vhost}/{exchange.name}" - response = self._put( + response = self.put( endpoint, json=exchange.dict(exclude_defaults=True, exclude={"name"}) ) logger.debug(response) def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): endpoint = f"exchanges/{vhost}/{name}" - response = self._delete(endpoint) + response = self.delete(endpoint) logger.debug(response) def queues( @@ -603,20 +601,20 @@ def queues( endpoint = "queues" if vhost is not None and name is not None: endpoint = f"{endpoint}/{vhost}/{name}/" - response = self._get(endpoint) + response = self.get(endpoint) return QueueInfo(**response.json()) elif vhost is not None: endpoint = f"{endpoint}/{vhost}/" elif name is not None: raise ValueError("name can not be set without vhost") - response = self._get(endpoint) + response = self.get(endpoint) # print(response.url) logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] def queue_declare(self, vhost: str, queue: QueueSpec): endpoint = f"queues/{vhost}/{queue.name}" - response = self._put( + response = self.put( endpoint, json=queue.dict(exclude_defaults=True, exclude={"name"}) ) logger.debug(response) @@ -625,7 +623,7 @@ def queue_delete( self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False ): endpoint = f"queues/{vhost}/{name}" - response = self._delete(endpoint) + response = self.delete(endpoint) logger.debug(response) From 45d688dc71cb7ff7797790d9471dfe8c561b4c45 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 12:39:29 +0100 Subject: [PATCH 10/29] Fix tests for new API Adds requests_mock dev dependency --- requirements_dev.txt | 1 + src/zocalo/util/rabbitmq.py | 135 ++++++++++++++++++------------------ tests/util/test_rabbitmq.py | 134 +++++++++++++++++++---------------- 3 files changed, 141 insertions(+), 129 deletions(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 182d6cd..c6aa94f 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,5 +4,6 @@ marshmallow==3.13.0 pytest-cov==3.0.0 pytest-mock pytest==6.2.5 +requests_mock setuptools==58.2.0 workflows==2.14 diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index a5346c7..63dcc0d 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -123,13 +123,13 @@ class ConnectionInfo(BaseModel): vhost: str = Field( ..., description="Virtual host name with non-ASCII characters escaped as in C." ) - timeout: int = Field( - ..., + timeout: Optional[int] = Field( + None, description="Connection timeout / negotiated heartbeat interval, in seconds.", ) frame_max: int = Field(..., description="Maximum frame size (bytes).") - channel_max: int = Field( - ..., description="Maximum number of channels on this connection." + channel_max: Optional[int] = Field( + None, description="Maximum number of channels on this connection." ) # client_properties # Informational properties transmitted by the client during connection establishment. @@ -157,114 +157,114 @@ class NodeInfo(BaseModel): # applications List of all Erlang applications running on the node. # auth_mechanisms List of all SASL authentication mechanisms installed on the node. # cluster_links A list of the other nodes in the cluster. For each node, there are details of the TCP connection used to connect to it and statistics on data that has been transferred. - config_files: List[pathlib.Path] = Field( - ..., description="List of config files read by the node." + config_files: Optional[List[pathlib.Path]] = Field( + None, description="List of config files read by the node." ) # contexts List of all HTTP listeners on the node. - db_dir: pathlib.Path = Field( - ..., description="Location of the persistent storage used by the node." + db_dir: Optional[pathlib.Path] = Field( + None, description="Location of the persistent storage used by the node." ) disk_free: int = Field(..., description="Disk free space in bytes.") disk_free_alarm: bool = Field( ..., description="Whether the disk alarm has gone off." ) - disk_free_limit: int = Field( - ..., description="Point at which the disk alarm will go off." + disk_free_limit: Optional[int] = Field( + None, description="Point at which the disk alarm will go off." ) - enabled_plugins: List[str] = Field( - ..., + enabled_plugins: Optional[List[str]] = Field( + None, description="List of plugins which are both explicitly enabled and running.", ) # exchange_types Exchange types available on the node. fd_total: int = Field(..., description="File descriptors available.") fd_used: int = Field(..., description="Used file descriptors.") - io_read_avg_time: float = Field( - ..., + io_read_avg_time: Optional[int] = Field( + None, ge=0, description="Average wall time (milliseconds) for each disk read operation in the last statistics interval.", ) - io_read_bytes: int = Field( - ..., description="Total number of bytes read from disk by the persister." + io_read_bytes: Optional[int] = Field( + None, description="Total number of bytes read from disk by the persister." ) - io_read_count: int = Field( - ..., description="Total number of read operations by the persister." + io_read_count: Optional[int] = Field( + None, description="Total number of read operations by the persister." ) - io_reopen_count: int = Field( - ..., + io_reopen_count: Optional[int] = Field( + None, description="Total number of times the persister has needed to recycle file handles between queues. In an ideal world this number will be zero; if the number is large, performance might be improved by increasing the number of file handles available to RabbitMQ.", ) - io_seek_avg_time: int = Field( - ..., + io_seek_avg_time: Optional[int] = Field( + None, description="Average wall time (milliseconds) for each seek operation in the last statistics interval.", ) - io_seek_count: int = Field( - ..., description="Total number of seek operations by the persister." + io_seek_count: Optional[int] = Field( + None, description="Total number of seek operations by the persister." ) - io_sync_avg_time: int = Field( - ..., + io_sync_avg_time: Optional[int] = Field( + None, description="Average wall time (milliseconds) for each fsync() operation in the last statistics interval.", ) - io_sync_count: int = Field( - ..., description="Total number of fsync() operations by the persister." + io_sync_count: Optional[int] = Field( + None, description="Total number of fsync() operations by the persister." ) - io_write_avg_time: int = Field( - ..., + io_write_avg_time: Optional[int] = Field( + None, description="Average wall time (milliseconds) for each disk write operation in the last statistics interval.", ) - io_write_bytes: int = Field( - ..., description="Total number of bytes written to disk by the persister." + io_write_bytes: Optional[int] = Field( + None, description="Total number of bytes written to disk by the persister." ) - io_write_count: int = Field( - ..., description="Total number of write operations by the persister." + io_write_count: Optional[int] = Field( + None, description="Total number of write operations by the persister." ) - log_files: List[pathlib.Path] = Field( - ..., + log_files: Optional[List[pathlib.Path]] = Field( + None, description='List of log files used by the node. If the node also sends messages to stdout, "" is also reported in the list.', ) mem_used: int = Field(..., description="Memory used in bytes.") mem_alarm: bool = Field(..., description="Whether the memory alarm has gone off.") - mem_limit: int = Field( - ..., description="Point at which the memory alarm will go off." + mem_limit: Optional[int] = Field( + None, description="Point at which the memory alarm will go off." ) - mnesia_disk_tx_count: int = Field( - ..., + mnesia_disk_tx_count: Optional[int] = Field( + None, description="Number of Mnesia transactions which have been performed that required writes to disk. (e.g. creating a durable queue). Only transactions which originated on this node are included.", ) - mnesia_ram_tx_count: int = Field( - ..., + mnesia_ram_tx_count: Optional[int] = Field( + None, description="Number of Mnesia transactions which have been performed that did not require writes to disk. (e.g. creating a transient queue). Only transactions which originated on this node are included.", ) - msg_store_read_count: int = Field( - ..., + msg_store_read_count: Optional[int] = Field( + None, description="Number of messages which have been read from the message store.", ) - msg_store_write_count: int = Field( - ..., + msg_store_write_count: Optional[int] = Field( + None, description="Number of messages which have been written to the message store.", ) name: str = Field(..., description="Node name.") - net_ticktime: int = Field( - ..., description="Current kernel net_ticktime setting for the node." + net_ticktime: Optional[int] = Field( + None, description="Current kernel net_ticktime setting for the node." ) - os_pid: int = Field( - ..., + os_pid: Optional[int] = Field( + None, description="Process identifier for the Operating System under which this node is running.", ) # partitions List of network partitions this node is seeing. proc_total: int = Field(..., description="Maximum number of Erlang processes.") proc_used: int = Field(..., description="Number of Erlang processes in use.") - processors: int = Field( - ..., description="Number of cores detected and usable by Erlang." + processors: Optional[int] = Field( + None, description="Number of cores detected and usable by Erlang." ) - queue_index_journal_write_count: int = Field( - ..., + queue_index_journal_write_count: Optional[int] = Field( + None, description="Number of records written to the queue index journal. Each record represents a message being published to a queue, being delivered from a queue, and being acknowledged in a queue.", ) - queue_index_read_count: int = Field( - ..., description="Number of records read from the queue index." + queue_index_read_count: Optional[int] = Field( + None, description="Number of records read from the queue index." ) - queue_index_write_count: int = Field( - ..., description="Number of records written to the queue index." + queue_index_write_count: Optional[int] = Field( + None, description="Number of records written to the queue index." ) # rates_mode: 'none', 'basic' or 'detailed'. run_queue: float = Field( @@ -275,13 +275,13 @@ class NodeInfo(BaseModel): description="Boolean for whether this node is up. Obviously if this is false, most other stats will be missing.", ) # sasl_log_file Location of sasl log file. - sockets_total: int = Field( - ..., description="File descriptors available for use as sockets." + sockets_total: Optional[int] = Field( + None, description="File descriptors available for use as sockets." ) sockets_used: int = Field(..., description="File descriptors used as sockets.") - type: NodeType - uptime: int = Field( - ..., description="Time since the Erlang VM started, in milliseconds." + type: Optional[NodeType] = None + uptime: Optional[int] = Field( + None, description="Time since the Erlang VM started, in milliseconds." ) # memory Detailed memory use statistics. Only appears if ?memory=true is appended to the URL. # binary Detailed breakdown of the owners of binary memory. Only appears if ?binary=true is appended to the URL. Note that this can be an expensive query if there are many small binaries in the system. @@ -526,7 +526,7 @@ def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: if response.status_code == requests.codes.ok: success[health_check] = response.json() else: - failure[health_check] = response.text + failure[health_check] = response.json() return success, failure def get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: @@ -560,7 +560,7 @@ def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: # https://www.rabbitmq.com/monitoring.html#node-metrics endpoint = "nodes" if name is not None: - endpoint = f"{endpoint}/{name}/" + endpoint = f"{endpoint}/{name}" response = self.get(endpoint) return NodeInfo(**response.json()) response = self.get(endpoint) @@ -600,15 +600,14 @@ def queues( ) -> Union[List[QueueInfo], QueueInfo]: endpoint = "queues" if vhost is not None and name is not None: - endpoint = f"{endpoint}/{vhost}/{name}/" + endpoint = f"{endpoint}/{vhost}/{name}" response = self.get(endpoint) return QueueInfo(**response.json()) elif vhost is not None: - endpoint = f"{endpoint}/{vhost}/" + endpoint = f"{endpoint}/{vhost}" elif name is not None: raise ValueError("name can not be set without vhost") response = self.get(endpoint) - # print(response.url) logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 3b811fa..587888a 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -1,10 +1,10 @@ -import json -import urllib.request +import re import pytest import zocalo.configuration -from zocalo.util.rabbitmq import RabbitMQAPI, http_api_request +import zocalo.util.rabbitmq +from zocalo.util.rabbitmq import NodeInfo, QueueInfo, RabbitMQAPI, http_api_request @pytest.fixture @@ -23,72 +23,80 @@ def test_http_api_request(zocalo_configuration): assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues" -def test_api_health_checks(mocker, zocalo_configuration): - mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") - mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") - mock_api.return_value = "" - mock_url.return_value = mocker.MagicMock() - mock_url.return_value.__enter__.return_value.read.return_value = json.dumps( - {"status": "ok"} - ) - rmq = RabbitMQAPI(zocalo_configuration) +def test_api_health_checks(requests_mock, zocalo_configuration): + requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) success, failures = rmq.health_checks assert not failures assert success for k, v in success.items(): - assert k.startswith("/health/checks/") + assert k.startswith("health/checks/") assert v == {"status": "ok"} -def test_api_health_checks_failures(mocker, zocalo_configuration): - mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") - mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") - mock_api.return_value = "" - mock_url.return_value = mocker.MagicMock() - mock_url.return_value.__enter__.return_value.read.side_effect = ( - urllib.error.HTTPError( - "http://foo.com", 503, "Service Unavailable", mocker.Mock(), mocker.Mock() - ) +def test_api_health_checks_failures(requests_mock, zocalo_configuration): + expected_json = { + "status": "failed", + "reason": "No active listener", + "missing": 1234, + "ports": [25672, 15672, 1883, 15692, 61613, 5672], + } + requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) + requests_mock.get( + re.compile("/health/checks/port-listener"), + status_code=503, + reason="No active listener", + json=expected_json, ) - rmq = RabbitMQAPI(zocalo_configuration) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) success, failures = rmq.health_checks assert failures - assert not success + assert success + assert len(failures) == 1 for k, v in success.items(): - assert k.startswith("/health/checks/") - assert v == "HTTP Error 503: Service Unavailable" - - -def test_api_queues(mocker, zocalo_configuration): - queues = [ - { - "consumers": 0, - "memory": 110112, - "message_stats": { - "deliver_get": 33, - "deliver_get_details": {"rate": 0}, - "publish": 22, - "publish_details": {"rate": 0}, - }, - "messages": 0, - "messages_ready": 0, - "messages_unacknowledged": 0, - "name": "foo", - "vhost": "zocalo", + assert k.startswith("health/checks/") + assert v == {"status": "ok"} + for k, v in failures.items(): + assert k.startswith("health/checks/port-listener/") + assert v == expected_json + + +def test_api_queues(requests_mock, zocalo_configuration): + queue = { + "consumers": 0, + "exclusive": False, + "memory": 110112, + "message_stats": { + "deliver_get": 33, + "deliver_get_details": {"rate": 0}, + "publish": 22, + "publish_details": {"rate": 0}, }, - ] + "messages": 0, + "messages_ready": 0, + "messages_unacknowledged": 0, + "name": "foo", + "vhost": "zocalo", + } + + # First call rmq.queues() with defaults + requests_mock.get("/api/queues", json=[queue]) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + assert rmq.queues() == [QueueInfo(**queue)] - mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") - mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") - mock_api.return_value = "" - mock_url.return_value = mocker.MagicMock() - mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(queues) - rmq = RabbitMQAPI(zocalo_configuration) - assert rmq.queues == queues + # Now call with vhost=... + requests_mock.get("/api/queues/zocalo", json=[queue]) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + assert rmq.queues(vhost="zocalo") == [QueueInfo(**queue)] + # Now call with vhost=..., name=... + requests_mock.get(f"/api/queues/zocalo/{queue['name']}", json=queue) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + assert rmq.queues(vhost="zocalo", name=queue["name"]) == QueueInfo(**queue) -def test_api_nodes(mocker, zocalo_configuration): - nodes = { + +def test_api_nodes(requests_mock, zocalo_configuration): + node = { "name": "rabbit@pooter123", "mem_limit": 80861855744, "mem_alarm": False, @@ -106,12 +114,16 @@ def test_api_nodes(mocker, zocalo_configuration): "proc_total": 1048576, "proc_used": 590, "run_queue": 1, + "running": True, + "type": "disc", } - mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request") - mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen") - mock_api.return_value = "" - mock_url.return_value = mocker.MagicMock() - mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(nodes) - rmq = RabbitMQAPI(zocalo_configuration) - assert rmq.queues == nodes + # First call rmq.nodes() with defaults + requests_mock.get("/api/nodes", json=[node]) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + assert rmq.nodes() == [NodeInfo(**node)] + + # Now call with name=... + requests_mock.get(f"/api/nodes/{node['name']}", json=node) + rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + assert rmq.nodes(name=node["name"]) == NodeInfo(**node) From ece517d65db2632af112c8c19b9b8313f8b2d114 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 14:16:23 +0100 Subject: [PATCH 11/29] Tests for queue_declare/delete Pass if_unused/empty parameters to queue_delete --- src/zocalo/util/rabbitmq.py | 4 ++- tests/util/test_rabbitmq.py | 57 +++++++++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 63dcc0d..b18be58 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -622,7 +622,9 @@ def queue_delete( self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False ): endpoint = f"queues/{vhost}/{name}" - response = self.delete(endpoint) + response = self.delete( + endpoint, params={"if_unused": if_unused, "if_empty": if_empty} + ) logger.debug(response) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 587888a..e69a934 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -3,8 +3,7 @@ import pytest import zocalo.configuration -import zocalo.util.rabbitmq -from zocalo.util.rabbitmq import NodeInfo, QueueInfo, RabbitMQAPI, http_api_request +import zocalo.util.rabbitmq as rabbitmq @pytest.fixture @@ -19,13 +18,13 @@ def zocalo_configuration(mocker): def test_http_api_request(zocalo_configuration): - request = http_api_request(zocalo_configuration, api_path="/queues") + request = rabbitmq.http_api_request(zocalo_configuration, api_path="/queues") assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues" def test_api_health_checks(requests_mock, zocalo_configuration): + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) success, failures = rmq.health_checks assert not failures assert success @@ -35,6 +34,7 @@ def test_api_health_checks(requests_mock, zocalo_configuration): def test_api_health_checks_failures(requests_mock, zocalo_configuration): + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) expected_json = { "status": "failed", "reason": "No active listener", @@ -48,7 +48,6 @@ def test_api_health_checks_failures(requests_mock, zocalo_configuration): reason="No active listener", json=expected_json, ) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) success, failures = rmq.health_checks assert failures assert success @@ -78,24 +77,54 @@ def test_api_queues(requests_mock, zocalo_configuration): "name": "foo", "vhost": "zocalo", } + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) # First call rmq.queues() with defaults requests_mock.get("/api/queues", json=[queue]) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) - assert rmq.queues() == [QueueInfo(**queue)] + assert rmq.queues() == [rabbitmq.QueueInfo(**queue)] # Now call with vhost=... requests_mock.get("/api/queues/zocalo", json=[queue]) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) - assert rmq.queues(vhost="zocalo") == [QueueInfo(**queue)] + assert rmq.queues(vhost="zocalo") == [rabbitmq.QueueInfo(**queue)] # Now call with vhost=..., name=... requests_mock.get(f"/api/queues/zocalo/{queue['name']}", json=queue) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) - assert rmq.queues(vhost="zocalo", name=queue["name"]) == QueueInfo(**queue) + assert rmq.queues(vhost="zocalo", name=queue["name"]) == rabbitmq.QueueInfo(**queue) + + +def test_api_queue_declare(requests_mock, zocalo_configuration): + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + qspec = rabbitmq.QueueSpec( + name="foo", auto_delete=True, arguments={"x-single-active-consumer": True} + ) + requests_mock.put("/api/queues/zocalo/foo") + rmq.queue_declare(vhost="zocalo", queue=qspec) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith("/api/queues/zocalo/foo") + assert history.json() == {"auto_delete": True, "arguments": qspec.arguments} + + +def test_api_queue_delete(requests_mock, zocalo_configuration): + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + requests_mock.delete("/api/queues/zocalo/foo") + requests_mock.delete("/api/queues/zocalo/bar") + rmq.queue_delete(vhost="zocalo", name="foo") + rmq.queue_delete(vhost="zocalo", name="bar", if_unused=True, if_empty=True) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "DELETE" + assert requests_mock.request_history[0].url.endswith( + "/api/queues/zocalo/foo?if_unused=False&if_empty=False" + ) + assert requests_mock.request_history[1].url.endswith( + "/api/queues/zocalo/bar?if_unused=True&if_empty=True" + ) def test_api_nodes(requests_mock, zocalo_configuration): + rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) node = { "name": "rabbit@pooter123", "mem_limit": 80861855744, @@ -120,10 +149,8 @@ def test_api_nodes(requests_mock, zocalo_configuration): # First call rmq.nodes() with defaults requests_mock.get("/api/nodes", json=[node]) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) - assert rmq.nodes() == [NodeInfo(**node)] + assert rmq.nodes() == [rabbitmq.NodeInfo(**node)] # Now call with name=... requests_mock.get(f"/api/nodes/{node['name']}", json=node) - rmq = RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) - assert rmq.nodes(name=node["name"]) == NodeInfo(**node) + assert rmq.nodes(name=node["name"]) == rabbitmq.NodeInfo(**node) From 7a19152b76bd0682c816108f33789ac935c026c2 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 15:02:07 +0100 Subject: [PATCH 12/29] Tests for exchanges endpoints --- src/zocalo/util/rabbitmq.py | 7 ++- tests/util/test_rabbitmq.py | 103 ++++++++++++++++++++++++++++-------- 2 files changed, 86 insertions(+), 24 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index b18be58..cd4a03f 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -311,7 +311,10 @@ class ExchangeSpec(BaseModel): False, description="Whether the exchange is internal, i.e. cannot be directly published to by a client.", ) - arguments: dict[str, Any] = Field(..., description="Exchange arguments.") + arguments: Optional[dict[str, Any]] = Field(None, description="Exchange arguments.") + + class Config: + use_enum_values = True class ExchangeInfo(ExchangeSpec): @@ -584,7 +587,7 @@ def exchanges( return [ExchangeInfo(**qi) for qi in response.json()] def exchange_declare(self, vhost: str, exchange: ExchangeSpec): - endpoint = f"exchanges/{vhost}/{exchange.name}" + endpoint = f"exchanges/{vhost}/{exchange.name}/" response = self.put( endpoint, json=exchange.dict(exclude_defaults=True, exclude={"name"}) ) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index e69a934..b664ba3 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -17,15 +17,19 @@ def zocalo_configuration(mocker): return zc +@pytest.fixture +def rmqapi(zocalo_configuration): + return rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + + def test_http_api_request(zocalo_configuration): request = rabbitmq.http_api_request(zocalo_configuration, api_path="/queues") assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues" -def test_api_health_checks(requests_mock, zocalo_configuration): - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) +def test_api_health_checks(requests_mock, rmqapi): requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) - success, failures = rmq.health_checks + success, failures = rmqapi.health_checks assert not failures assert success for k, v in success.items(): @@ -33,8 +37,7 @@ def test_api_health_checks(requests_mock, zocalo_configuration): assert v == {"status": "ok"} -def test_api_health_checks_failures(requests_mock, zocalo_configuration): - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) +def test_api_health_checks_failures(requests_mock, rmqapi): expected_json = { "status": "failed", "reason": "No active listener", @@ -48,7 +51,7 @@ def test_api_health_checks_failures(requests_mock, zocalo_configuration): reason="No active listener", json=expected_json, ) - success, failures = rmq.health_checks + success, failures = rmqapi.health_checks assert failures assert success assert len(failures) == 1 @@ -60,7 +63,7 @@ def test_api_health_checks_failures(requests_mock, zocalo_configuration): assert v == expected_json -def test_api_queues(requests_mock, zocalo_configuration): +def test_api_queues(requests_mock, rmqapi): queue = { "consumers": 0, "exclusive": False, @@ -77,28 +80,28 @@ def test_api_queues(requests_mock, zocalo_configuration): "name": "foo", "vhost": "zocalo", } - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) # First call rmq.queues() with defaults requests_mock.get("/api/queues", json=[queue]) - assert rmq.queues() == [rabbitmq.QueueInfo(**queue)] + assert rmqapi.queues() == [rabbitmq.QueueInfo(**queue)] # Now call with vhost=... requests_mock.get("/api/queues/zocalo", json=[queue]) - assert rmq.queues(vhost="zocalo") == [rabbitmq.QueueInfo(**queue)] + assert rmqapi.queues(vhost="zocalo") == [rabbitmq.QueueInfo(**queue)] # Now call with vhost=..., name=... requests_mock.get(f"/api/queues/zocalo/{queue['name']}", json=queue) - assert rmq.queues(vhost="zocalo", name=queue["name"]) == rabbitmq.QueueInfo(**queue) + assert rmqapi.queues(vhost="zocalo", name=queue["name"]) == rabbitmq.QueueInfo( + **queue + ) -def test_api_queue_declare(requests_mock, zocalo_configuration): - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) +def test_api_queue_declare(requests_mock, rmqapi): qspec = rabbitmq.QueueSpec( name="foo", auto_delete=True, arguments={"x-single-active-consumer": True} ) requests_mock.put("/api/queues/zocalo/foo") - rmq.queue_declare(vhost="zocalo", queue=qspec) + rmqapi.queue_declare(vhost="zocalo", queue=qspec) assert requests_mock.call_count == 1 history = requests_mock.request_history[0] assert history.method == "PUT" @@ -106,12 +109,11 @@ def test_api_queue_declare(requests_mock, zocalo_configuration): assert history.json() == {"auto_delete": True, "arguments": qspec.arguments} -def test_api_queue_delete(requests_mock, zocalo_configuration): - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) +def test_api_queue_delete(requests_mock, rmqapi): requests_mock.delete("/api/queues/zocalo/foo") requests_mock.delete("/api/queues/zocalo/bar") - rmq.queue_delete(vhost="zocalo", name="foo") - rmq.queue_delete(vhost="zocalo", name="bar", if_unused=True, if_empty=True) + rmqapi.queue_delete(vhost="zocalo", name="foo") + rmqapi.queue_delete(vhost="zocalo", name="bar", if_unused=True, if_empty=True) assert requests_mock.call_count == 2 for history in requests_mock.request_history: assert history.method == "DELETE" @@ -123,8 +125,7 @@ def test_api_queue_delete(requests_mock, zocalo_configuration): ) -def test_api_nodes(requests_mock, zocalo_configuration): - rmq = rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) +def test_api_nodes(requests_mock, rmqapi): node = { "name": "rabbit@pooter123", "mem_limit": 80861855744, @@ -149,8 +150,66 @@ def test_api_nodes(requests_mock, zocalo_configuration): # First call rmq.nodes() with defaults requests_mock.get("/api/nodes", json=[node]) - assert rmq.nodes() == [rabbitmq.NodeInfo(**node)] + assert rmqapi.nodes() == [rabbitmq.NodeInfo(**node)] # Now call with name=... requests_mock.get(f"/api/nodes/{node['name']}", json=node) - assert rmq.nodes(name=node["name"]) == rabbitmq.NodeInfo(**node) + assert rmqapi.nodes(name=node["name"]) == rabbitmq.NodeInfo(**node) + + +def test_api_exchanges(requests_mock, rmqapi): + exchange = { + "arguments": {}, + "auto_delete": False, + "durable": True, + "internal": False, + "message_stats": { + "publish_in": 156447, + "publish_in_details": {"rate": 0.4}, + "publish_out": 156445, + "publish_out_details": {"rate": 0.4}, + }, + "name": "", + "type": "direct", + "user_who_performed_action": "rmq-internal", + "vhost": "foo", + } + + # First call rmq.exchanges() with defaults + requests_mock.get("/api/exchanges", json=[exchange]) + assert rmqapi.exchanges() == [rabbitmq.ExchangeInfo(**exchange)] + + # Now call with vhost=... + requests_mock.get("/api/exchanges/zocalo/", json=[exchange]) + assert rmqapi.exchanges(vhost="zocalo") == [rabbitmq.ExchangeInfo(**exchange)] + + # Now call with vhost=..., name=... + requests_mock.get( + f"/api/exchanges/{exchange['vhost']}/{exchange['name']}/", json=exchange + ) + assert rmqapi.exchanges( + vhost=exchange["vhost"], name=exchange["name"] + ) == rabbitmq.ExchangeInfo(**exchange) + + +@pytest.mark.parametrize("name", ["", "foo"]) +def test_api_exchange_declare(name, requests_mock, rmqapi): + exchange_spec = rabbitmq.ExchangeSpec( + name=name, + type="fanout", + durable=True, + auto_delete=True, + internal=False, + ) + requests_mock.put(f"/api/exchanges/zocalo/{name}/") + rmqapi.exchange_declare(vhost="zocalo", exchange=exchange_spec) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") + assert history.json() == { + "type": "fanout", + "auto_delete": True, + "durable": True, + "auto_delete": True, + } From 55b3177dfe4ca4cfcf9f145b0211ddd277ffb1c8 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 15:09:44 +0100 Subject: [PATCH 13/29] Test for empty exchange name --- tests/util/test_rabbitmq.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index b664ba3..dcb70dd 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -157,7 +157,8 @@ def test_api_nodes(requests_mock, rmqapi): assert rmqapi.nodes(name=node["name"]) == rabbitmq.NodeInfo(**node) -def test_api_exchanges(requests_mock, rmqapi): +@pytest.mark.parametrize("name", ["", "foo"]) +def test_api_exchanges(name, requests_mock, rmqapi): exchange = { "arguments": {}, "auto_delete": False, @@ -169,7 +170,7 @@ def test_api_exchanges(requests_mock, rmqapi): "publish_out": 156445, "publish_out_details": {"rate": 0.4}, }, - "name": "", + "name": name, "type": "direct", "user_who_performed_action": "rmq-internal", "vhost": "foo", From 5c1f1f2080dd9d182b751f5c77a38e5b331b6e32 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Mon, 25 Oct 2021 17:12:58 +0100 Subject: [PATCH 14/29] Test for rmqapi.connections() --- tests/util/test_rabbitmq.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index dcb70dd..5b9fcf0 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -214,3 +214,34 @@ def test_api_exchange_declare(name, requests_mock, rmqapi): "durable": True, "auto_delete": True, } + + +def test_api_connections(requests_mock, rmqapi): + connection = { + "auth_mechanism": "PLAIN", + "connected_at": 1634716019864, + "frame_max": 131072, + "host": "123.24.5.67", + "name": "123.24.5.67:12345 -> 123.24.5.67:54321", + "node": "rabbit@cs05r-sc-serv-26", + "peer_host": "123.24.5.67", + "peer_port": 12345, + "port": 54321, + "protocol": "AMQP 0-9-1", + "ssl": False, + "state": "running", + "timeout": 60, + "user": "foo", + "vhost": "bar", + "channels": 1, + } + + # First call rmq.connections() with defaults + requests_mock.get("/api/connections", json=[connection]) + assert rmqapi.connections() == [rabbitmq.ConnectionInfo(**connection)] + + # Now call with name=... + requests_mock.get(f"/api/connections/{connection['name']}/", json=connection) + assert rmqapi.connections(name=connection["name"]) == rabbitmq.ConnectionInfo( + **connection + ) From 94e0d6f3cf34831167477993229e86ecbee0f666 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Tue, 26 Oct 2021 11:14:13 +0100 Subject: [PATCH 15/29] get/add/delete users --- src/zocalo/util/rabbitmq.py | 56 +++++++++++++++++++++++++++++++++++++ tests/util/test_rabbitmq.py | 46 ++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index cd4a03f..d947578 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -462,6 +462,39 @@ class QueueInfo(QueueSpec): ) +class HashingAlgorithm(enum.Enum): + rabbit_password_hashing_sha256 = "rabbit_password_hashing_sha256" + rabbit_password_hashing_sha512 = "rabbit_password_hashing_sha512" + rabbit_password_hashing_md5 = "rabbit_password_hashing_md5" + + +class UserSpec(BaseModel): + """ + The tags key is mandatory. + Either password or password_hash can be set.If neither are set the user will not be + able to log in with a password, but other mechanisms like client certificates may + be used. Setting password_hash to "" will ensure the user cannot use a password to + log in. tags is a comma-separated list of tags for the user. Currently recognised + tags are administrator, monitoring and management. password_hash must be generated + using the algorithm described here. You may also specify the hash function being used + by adding the hashing_algorithm key to the body. Currently recognised algorithms are + rabbit_password_hashing_sha256, rabbit_password_hashing_sha512, and + rabbit_password_hashing_md5. + """ + + name: str = Field(..., description="Username") + password_hash: str = Field(..., description="Hash of the user password.") + hashing_algorithm: HashingAlgorithm + tags: str + + class Config: + use_enum_values = True + + +class UserInfo(UserSpec): + pass + + def http_api_request( zc: zocalo.configuration.Configuration, api_path: str, @@ -630,6 +663,27 @@ def queue_delete( ) logger.debug(response) + def users(self, name: str = None): + endpoint = "users" + if name: + endpoint = f"{endpoint}/{name}/" + response = self.get(endpoint) + return UserInfo(**response.json()) + response = self.get(endpoint) + return [UserInfo(**u) for u in response.json()] + + def add_user(self, user: UserSpec): + endpoint = f"users/{user.name}/" + response = self.put( + endpoint, json=user.dict(exclude_defaults=True, exclude={"name"}) + ) + logger.debug(response) + + def delete_user(self, name: str): + endpoint = f"users/{name}/" + response = self.delete(endpoint) + logger.debug(response) + if __name__ == "__main__": import time @@ -637,6 +691,8 @@ def queue_delete( zc = zocalo.configuration.from_file() zc.activate() rmq = RabbitMQAPI.from_zocalo_configuration(zc) + print(rmq.users()) + print(rmq.users(name="guest")) print(rmq.queues()) print(rmq.queues(vhost="zocalo", name="processing_recipe")) # time.sleep(5) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 5b9fcf0..6014309 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -245,3 +245,49 @@ def test_api_connections(requests_mock, rmqapi): assert rmqapi.connections(name=connection["name"]) == rabbitmq.ConnectionInfo( **connection ) + + +def test_api_users(requests_mock, rmqapi): + user = { + "name": "guest", + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } + + # First call rmq.users() with defaults + requests_mock.get("/api/users", json=[user]) + assert rmqapi.users() == [rabbitmq.UserInfo(**user)] + + # Now call with name=... + requests_mock.get(f"/api/users/{user['name']}/", json=user) + assert rmqapi.users(name=user["name"]) == rabbitmq.UserInfo(**user) + + +def test_api_add_user(requests_mock, rmqapi): + user = rabbitmq.UserSpec( + name="guest", + password_hash="guest", + hashing_algorithm="rabbit_password_hashing_sha256", + tags="administrator", + ) + requests_mock.put(f"/api/users/{user.name}/") + rmqapi.add_user(user=user) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/users/{user.name}/") + assert history.json() == { + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } + + +def test_api_delete_user(requests_mock, rmqapi): + requests_mock.delete("/api/users/guest/") + rmqapi.delete_user(name="guest") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/users/guest/") From 2e688104f90087137ab45c9ba7c369235f0a7f6d Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Tue, 26 Oct 2021 17:39:56 +0100 Subject: [PATCH 16/29] get/set/clear policies --- src/zocalo/util/rabbitmq.py | 73 ++++++++++++++++++++++++++++++++++++- tests/util/test_rabbitmq.py | 54 +++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index d947578..488d8f5 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -332,6 +332,48 @@ class ExchangeInfo(ExchangeSpec): ) +class PolicyApplyTo(enum.Enum): + """Which types of object this policy should apply to.""" + + queues = "queues" + exchanges = "exchanges" + all = "all" + + +class PolicySpec(BaseModel): + """Sets a policy.""" + + name: str = Field(..., description="The name of the policy.") + pattern: str = Field( + ..., + description="The regular expression, which when matches on a given resources causes the policy to apply.", + ) + definition: Dict[str, Any] = Field( + ..., + description="A set of key/value pairs (think a JSON document) that will be injected into the map of optional arguments of the matching queues and exchanges.", + ) + priority: int = Field( + 0, + description="The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.", + ) + apply_to: PolicyApplyTo = Field( + default=PolicyApplyTo.all, + alias="apply-to", + description="Which types of object this policy should apply to.", + ) + + class Config: + use_enum_values = True + validate_all = True + allow_population_by_field_name = True + + +class PolicyInfo(PolicySpec): + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + + class QueueState(str, enum.Enum): 'The state of the queue. Normally "running", but may be "{syncing, message_count}" if the queue is synchronising.' @@ -631,6 +673,35 @@ def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): response = self.delete(endpoint) logger.debug(response) + def policies( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[PolicyInfo], PolicyInfo]: + endpoint = "policies" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}/" + response = self.get(endpoint) + return PolicyInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}/" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self.get(endpoint) + logger.debug(response) + return [PolicyInfo(**p) for p in response.json()] + + def set_policy(self, vhost: str, policy: PolicySpec): + endpoint = f"policies/{vhost}/{policy.name}/" + response = self.put( + endpoint, + json=policy.dict(exclude_defaults=True, exclude={"name"}, by_alias=True), + ) + logger.debug(response) + + def clear_policy(self, vhost: str, name: str): + endpoint = f"policies/{vhost}/{name}/" + response = self.delete(endpoint) + logger.debug(response) + def queues( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[QueueInfo], QueueInfo]: @@ -663,7 +734,7 @@ def queue_delete( ) logger.debug(response) - def users(self, name: str = None): + def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: endpoint = "users" if name: endpoint = f"{endpoint}/{name}/" diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 6014309..8de8ecc 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -291,3 +291,57 @@ def test_api_delete_user(requests_mock, rmqapi): history = requests_mock.request_history[0] assert history.method == "DELETE" assert history.url.endswith("/api/users/guest/") + + +def test_api_policies(requests_mock, rmqapi): + policy = { + "vhost": "foo", + "name": "redelivery", + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + "priority": 0, + } + + # First call rmq.policies() with defaults + requests_mock.get("/api/policies", json=[policy]) + assert rmqapi.policies() == [rabbitmq.PolicyInfo(**policy)] + + # Now call with vhost=... + requests_mock.get(f"/api/policies/{policy['vhost']}/", json=[policy]) + assert rmqapi.policies(vhost=policy["vhost"]) == [rabbitmq.PolicyInfo(**policy)] + + # Now call with vhost=..., name=... + requests_mock.get(f"/api/policies/{policy['vhost']}/{policy['name']}/", json=policy) + assert rmqapi.policies( + vhost=policy["vhost"], name=policy["name"] + ) == rabbitmq.PolicyInfo(**policy) + + +def test_api_set_policy(requests_mock, rmqapi): + policy = rabbitmq.PolicySpec( + name="redelivery", + pattern="^amq.", + apply_to=rabbitmq.PolicyApplyTo.queues, + definition={"delivery-limit": 5}, + ) + requests_mock.put(f"/api/policies/foo/{policy.name}/") + rmqapi.set_policy(vhost="foo", policy=policy) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/policies/foo/{policy.name}/") + assert history.json() == { + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + } + + +def test_api_clear_policy(requests_mock, rmqapi): + requests_mock.delete("/api/policies/foo/bar/") + rmqapi.clear_policy(vhost="foo", name="bar") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/policies/foo/bar/") From 73094b2bd6badb44213db2b7b590d9bad6cd45f0 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 14:56:36 +0100 Subject: [PATCH 17/29] dict -> Dict --- src/zocalo/util/rabbitmq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 488d8f5..220059f 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -311,7 +311,7 @@ class ExchangeSpec(BaseModel): False, description="Whether the exchange is internal, i.e. cannot be directly published to by a client.", ) - arguments: Optional[dict[str, Any]] = Field(None, description="Exchange arguments.") + arguments: Optional[Dict[str, Any]] = Field(None, description="Exchange arguments.") class Config: use_enum_values = True @@ -394,7 +394,7 @@ class QueueSpec(BaseModel): False, description="Whether the queue will be deleted automatically when no longer used.", ) - arguments: Optional[dict[str, Any]] = Field(None, description="Queue arguments.") + arguments: Optional[Dict[str, Any]] = Field(None, description="Queue arguments.") class QueueInfo(QueueSpec): From 77d5b2ab5d49c775157c7b56c92a8fff8f715ffc Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 15:20:42 +0100 Subject: [PATCH 18/29] Add vhost to Exchange/Policy/QueueSpec To simplify interface --- src/zocalo/util/rabbitmq.py | 34 ++++++++++++++++++++++------------ tests/util/test_rabbitmq.py | 13 +++++++++---- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 220059f..d3914d0 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -312,6 +312,9 @@ class ExchangeSpec(BaseModel): description="Whether the exchange is internal, i.e. cannot be directly published to by a client.", ) arguments: Optional[Dict[str, Any]] = Field(None, description="Exchange arguments.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) class Config: use_enum_values = True @@ -361,6 +364,9 @@ class PolicySpec(BaseModel): alias="apply-to", description="Which types of object this policy should apply to.", ) + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) class Config: use_enum_values = True @@ -369,9 +375,7 @@ class Config: class PolicyInfo(PolicySpec): - vhost: str = Field( - ..., description="Virtual host name with non-ASCII characters escaped as in C." - ) + pass class QueueState(str, enum.Enum): @@ -395,6 +399,9 @@ class QueueSpec(BaseModel): description="Whether the queue will be deleted automatically when no longer used.", ) arguments: Optional[Dict[str, Any]] = Field(None, description="Queue arguments.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) class QueueInfo(QueueSpec): @@ -661,10 +668,11 @@ def exchanges( logger.debug(response) return [ExchangeInfo(**qi) for qi in response.json()] - def exchange_declare(self, vhost: str, exchange: ExchangeSpec): - endpoint = f"exchanges/{vhost}/{exchange.name}/" + def exchange_declare(self, exchange: ExchangeSpec): + endpoint = f"exchanges/{exchange.vhost}/{exchange.name}/" response = self.put( - endpoint, json=exchange.dict(exclude_defaults=True, exclude={"name"}) + endpoint, + json=exchange.dict(exclude_defaults=True, exclude={"name", "vhost"}), ) logger.debug(response) @@ -689,11 +697,13 @@ def policies( logger.debug(response) return [PolicyInfo(**p) for p in response.json()] - def set_policy(self, vhost: str, policy: PolicySpec): - endpoint = f"policies/{vhost}/{policy.name}/" + def set_policy(self, policy: PolicySpec): + endpoint = f"policies/{policy.vhost}/{policy.name}/" response = self.put( endpoint, - json=policy.dict(exclude_defaults=True, exclude={"name"}, by_alias=True), + json=policy.dict( + exclude_defaults=True, exclude={"name", "vhost"}, by_alias=True + ), ) logger.debug(response) @@ -718,10 +728,10 @@ def queues( logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] - def queue_declare(self, vhost: str, queue: QueueSpec): - endpoint = f"queues/{vhost}/{queue.name}" + def queue_declare(self, queue: QueueSpec): + endpoint = f"queues/{queue.vhost}/{queue.name}" response = self.put( - endpoint, json=queue.dict(exclude_defaults=True, exclude={"name"}) + endpoint, json=queue.dict(exclude_defaults=True, exclude={"name", "vhost"}) ) logger.debug(response) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 8de8ecc..27026a8 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -98,10 +98,13 @@ def test_api_queues(requests_mock, rmqapi): def test_api_queue_declare(requests_mock, rmqapi): qspec = rabbitmq.QueueSpec( - name="foo", auto_delete=True, arguments={"x-single-active-consumer": True} + name="foo", + auto_delete=True, + vhost="zocalo", + arguments={"x-single-active-consumer": True}, ) requests_mock.put("/api/queues/zocalo/foo") - rmqapi.queue_declare(vhost="zocalo", queue=qspec) + rmqapi.queue_declare(queue=qspec) assert requests_mock.call_count == 1 history = requests_mock.request_history[0] assert history.method == "PUT" @@ -201,9 +204,10 @@ def test_api_exchange_declare(name, requests_mock, rmqapi): durable=True, auto_delete=True, internal=False, + vhost="zocalo", ) requests_mock.put(f"/api/exchanges/zocalo/{name}/") - rmqapi.exchange_declare(vhost="zocalo", exchange=exchange_spec) + rmqapi.exchange_declare(exchange=exchange_spec) assert requests_mock.call_count == 1 history = requests_mock.request_history[0] assert history.method == "PUT" @@ -324,9 +328,10 @@ def test_api_set_policy(requests_mock, rmqapi): pattern="^amq.", apply_to=rabbitmq.PolicyApplyTo.queues, definition={"delivery-limit": 5}, + vhost="foo", ) requests_mock.put(f"/api/policies/foo/{policy.name}/") - rmqapi.set_policy(vhost="foo", policy=policy) + rmqapi.set_policy(policy=policy) assert requests_mock.call_count == 1 history = requests_mock.request_history[0] assert history.method == "PUT" From 11919eacbf6af7f78fdad17905d2bbf4d2e4d92f Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 15:30:47 +0100 Subject: [PATCH 19/29] Test exchange_delete --- src/zocalo/util/rabbitmq.py | 2 +- tests/util/test_rabbitmq.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index d3914d0..ee288f4 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -678,7 +678,7 @@ def exchange_declare(self, exchange: ExchangeSpec): def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): endpoint = f"exchanges/{vhost}/{name}" - response = self.delete(endpoint) + response = self.delete(endpoint, params={"if_unused": if_unused}) logger.debug(response) def policies( diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 27026a8..b60eaac 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -220,6 +220,16 @@ def test_api_exchange_declare(name, requests_mock, rmqapi): } +def test_api_exchange_delete(requests_mock, rmqapi): + requests_mock.delete("/api/exchanges/zocalo/foo") + rmqapi.exchange_delete(vhost="zocalo", name="foo") + assert requests_mock.call_count == 1 + assert requests_mock.request_history[0].method == "DELETE" + assert requests_mock.request_history[0].url.endswith( + "/api/exchanges/zocalo/foo?if_unused=False" + ) + + def test_api_connections(requests_mock, rmqapi): connection = { "auth_mechanism": "PLAIN", From ec0e219b82f8def1db0c7e8ad9ba87f89e6ff9a6 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 17:09:17 +0100 Subject: [PATCH 20/29] Add {create,delete}_component singledispatchmethods --- src/zocalo/util/rabbitmq.py | 33 +++++++ tests/util/test_rabbitmq.py | 170 ++++++++++++++++++++---------------- 2 files changed, 128 insertions(+), 75 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index ee288f4..4fbec00 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,5 +1,6 @@ import datetime import enum +import functools import logging import pathlib import urllib @@ -629,6 +630,14 @@ def delete(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Respo f"{self._url}/{endpoint}", auth=self._auth, params=params ) + @functools.singledispatchmethod + def create_component(self, component): + raise NotImplementedError(f"Component {component} not recognised") + + @functools.singledispatchmethod + def delete_component(self, component): + raise NotImplementedError(f"Component {component} not recognised") + def connections( self, name: Optional[str] = None ) -> Union[List[ConnectionInfo], ConnectionInfo]: @@ -668,6 +677,7 @@ def exchanges( logger.debug(response) return [ExchangeInfo(**qi) for qi in response.json()] + @create_component.register def exchange_declare(self, exchange: ExchangeSpec): endpoint = f"exchanges/{exchange.vhost}/{exchange.name}/" response = self.put( @@ -681,6 +691,10 @@ def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): response = self.delete(endpoint, params={"if_unused": if_unused}) logger.debug(response) + @delete_component.register + def _delete_exchange(self, exchange: ExchangeSpec, **kwargs): + self.exchange_delete(vhost=exchange.vhost, name=exchange.name, **kwargs) + def policies( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[PolicyInfo], PolicyInfo]: @@ -697,6 +711,7 @@ def policies( logger.debug(response) return [PolicyInfo(**p) for p in response.json()] + @create_component.register def set_policy(self, policy: PolicySpec): endpoint = f"policies/{policy.vhost}/{policy.name}/" response = self.put( @@ -712,6 +727,10 @@ def clear_policy(self, vhost: str, name: str): response = self.delete(endpoint) logger.debug(response) + @delete_component.register + def _delete_policy(self, policy: PolicySpec): + self.clear_policy(vhost=policy.vhost, name=policy.name) + def queues( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[QueueInfo], QueueInfo]: @@ -728,6 +747,7 @@ def queues( logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] + @create_component.register def queue_declare(self, queue: QueueSpec): endpoint = f"queues/{queue.vhost}/{queue.name}" response = self.put( @@ -744,6 +764,14 @@ def queue_delete( ) logger.debug(response) + @delete_component.register + def _delete_queue( + self, queue: QueueSpec, if_unused: bool = False, if_empty: bool = False + ): + self.queue_delete( + vhost=queue.vhost, name=queue.name, if_unused=if_unused, if_empty=if_empty + ) + def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: endpoint = "users" if name: @@ -753,6 +781,7 @@ def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: response = self.get(endpoint) return [UserInfo(**u) for u in response.json()] + @create_component.register def add_user(self, user: UserSpec): endpoint = f"users/{user.name}/" response = self.put( @@ -765,6 +794,10 @@ def delete_user(self, name: str): response = self.delete(endpoint) logger.debug(response) + @delete_component.register + def _delete_user(self, user: UserSpec): + self.delete_user(name=user.name) + if __name__ == "__main__": import time diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index b60eaac..cddd164 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -96,36 +96,40 @@ def test_api_queues(requests_mock, rmqapi): ) -def test_api_queue_declare(requests_mock, rmqapi): - qspec = rabbitmq.QueueSpec( +@pytest.fixture +def queue_spec(): + return rabbitmq.QueueSpec( name="foo", auto_delete=True, vhost="zocalo", arguments={"x-single-active-consumer": True}, ) + + +def test_api_queue_declare(requests_mock, rmqapi, queue_spec): requests_mock.put("/api/queues/zocalo/foo") - rmqapi.queue_declare(queue=qspec) - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "PUT" - assert history.url.endswith("/api/queues/zocalo/foo") - assert history.json() == {"auto_delete": True, "arguments": qspec.arguments} + rmqapi.queue_declare(queue=queue_spec) + rmqapi.create_component(queue_spec) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "PUT" + assert history.url.endswith("/api/queues/zocalo/foo") + assert history.json() == { + "auto_delete": True, + "arguments": queue_spec.arguments, + } -def test_api_queue_delete(requests_mock, rmqapi): +def test_api_queue_delete(requests_mock, rmqapi, queue_spec): requests_mock.delete("/api/queues/zocalo/foo") - requests_mock.delete("/api/queues/zocalo/bar") - rmqapi.queue_delete(vhost="zocalo", name="foo") - rmqapi.queue_delete(vhost="zocalo", name="bar", if_unused=True, if_empty=True) + rmqapi.queue_delete(vhost="zocalo", name="foo", if_unused=True, if_empty=True) + rmqapi.delete_component(queue_spec, if_unused=True, if_empty=True) assert requests_mock.call_count == 2 for history in requests_mock.request_history: assert history.method == "DELETE" - assert requests_mock.request_history[0].url.endswith( - "/api/queues/zocalo/foo?if_unused=False&if_empty=False" - ) - assert requests_mock.request_history[1].url.endswith( - "/api/queues/zocalo/bar?if_unused=True&if_empty=True" - ) + assert history.url.endswith( + "/api/queues/zocalo/foo?if_unused=True&if_empty=True" + ) def test_api_nodes(requests_mock, rmqapi): @@ -196,9 +200,8 @@ def test_api_exchanges(name, requests_mock, rmqapi): ) == rabbitmq.ExchangeInfo(**exchange) -@pytest.mark.parametrize("name", ["", "foo"]) -def test_api_exchange_declare(name, requests_mock, rmqapi): - exchange_spec = rabbitmq.ExchangeSpec( +def exchange_spec(name): + return rabbitmq.ExchangeSpec( name=name, type="fanout", durable=True, @@ -206,28 +209,33 @@ def test_api_exchange_declare(name, requests_mock, rmqapi): internal=False, vhost="zocalo", ) + + +@pytest.mark.parametrize("name", ["", "foo"]) +def test_api_exchange_declare(name, requests_mock, rmqapi): requests_mock.put(f"/api/exchanges/zocalo/{name}/") - rmqapi.exchange_declare(exchange=exchange_spec) - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "PUT" - assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") - assert history.json() == { - "type": "fanout", - "auto_delete": True, - "durable": True, - "auto_delete": True, - } + rmqapi.exchange_declare(exchange=exchange_spec(name)) + rmqapi.create_component(exchange_spec(name)) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "PUT" + assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") + assert history.json() == { + "type": "fanout", + "auto_delete": True, + "durable": True, + "auto_delete": True, + } def test_api_exchange_delete(requests_mock, rmqapi): requests_mock.delete("/api/exchanges/zocalo/foo") - rmqapi.exchange_delete(vhost="zocalo", name="foo") - assert requests_mock.call_count == 1 - assert requests_mock.request_history[0].method == "DELETE" - assert requests_mock.request_history[0].url.endswith( - "/api/exchanges/zocalo/foo?if_unused=False" - ) + rmqapi.exchange_delete(vhost="zocalo", name="foo", if_unused=True) + rmqapi.delete_component(exchange_spec("foo"), if_unused=True) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "DELETE" + assert history.url.endswith("/api/exchanges/zocalo/foo?if_unused=True") def test_api_connections(requests_mock, rmqapi): @@ -278,33 +286,39 @@ def test_api_users(requests_mock, rmqapi): assert rmqapi.users(name=user["name"]) == rabbitmq.UserInfo(**user) -def test_api_add_user(requests_mock, rmqapi): - user = rabbitmq.UserSpec( +@pytest.fixture +def user_spec(): + return rabbitmq.UserSpec( name="guest", password_hash="guest", hashing_algorithm="rabbit_password_hashing_sha256", tags="administrator", ) - requests_mock.put(f"/api/users/{user.name}/") - rmqapi.add_user(user=user) - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "PUT" - assert history.url.endswith(f"/api/users/{user.name}/") - assert history.json() == { - "password_hash": "guest", - "hashing_algorithm": "rabbit_password_hashing_sha256", - "tags": "administrator", - } -def test_api_delete_user(requests_mock, rmqapi): +def test_api_add_user(requests_mock, rmqapi, user_spec): + requests_mock.put(f"/api/users/{user_spec.name}/") + rmqapi.add_user(user=user_spec) + rmqapi.create_component(user_spec) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "PUT" + assert history.url.endswith(f"/api/users/{user_spec.name}/") + assert history.json() == { + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } + + +def test_api_delete_user(requests_mock, rmqapi, user_spec): requests_mock.delete("/api/users/guest/") rmqapi.delete_user(name="guest") - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "DELETE" - assert history.url.endswith("/api/users/guest/") + rmqapi.delete_component(user_spec) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "DELETE" + assert history.url.endswith("/api/users/guest/") def test_api_policies(requests_mock, rmqapi): @@ -332,31 +346,37 @@ def test_api_policies(requests_mock, rmqapi): ) == rabbitmq.PolicyInfo(**policy) -def test_api_set_policy(requests_mock, rmqapi): - policy = rabbitmq.PolicySpec( - name="redelivery", +@pytest.fixture +def policy_spec(): + return rabbitmq.PolicySpec( + name="bar", pattern="^amq.", apply_to=rabbitmq.PolicyApplyTo.queues, definition={"delivery-limit": 5}, vhost="foo", ) - requests_mock.put(f"/api/policies/foo/{policy.name}/") - rmqapi.set_policy(policy=policy) - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "PUT" - assert history.url.endswith(f"/api/policies/foo/{policy.name}/") - assert history.json() == { - "pattern": "^amq.", - "apply-to": "queues", - "definition": {"delivery-limit": 5}, - } -def test_api_clear_policy(requests_mock, rmqapi): +def test_api_set_policy(requests_mock, rmqapi, policy_spec): + requests_mock.put(f"/api/policies/foo/{policy_spec.name}/") + rmqapi.set_policy(policy=policy_spec) + rmqapi.create_component(policy_spec) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "PUT" + assert history.url.endswith(f"/api/policies/foo/{policy_spec.name}/") + assert history.json() == { + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + } + + +def test_api_clear_policy(requests_mock, rmqapi, policy_spec): requests_mock.delete("/api/policies/foo/bar/") rmqapi.clear_policy(vhost="foo", name="bar") - assert requests_mock.call_count == 1 - history = requests_mock.request_history[0] - assert history.method == "DELETE" - assert history.url.endswith("/api/policies/foo/bar/") + rmqapi.delete_component(policy_spec) + assert requests_mock.call_count == 2 + for history in requests_mock.request_history: + assert history.method == "DELETE" + assert history.url.endswith("/api/policies/foo/bar/") From 3f123dee9a63c8d5204f7fa45233ef6fadd5f054 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 22:15:29 +0100 Subject: [PATCH 21/29] Add requests, pydantic to dependencies --- setup.cfg | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.cfg b/setup.cfg index 319fb14..e00f9a2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,8 @@ install_requires = PyYAML graypy>=1.0 marshmallow + requests + pydantic setuptools workflows>=2.14 packages = find: From fd63b3a9265628770036685505d28773c4c065fb Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 22:19:22 +0100 Subject: [PATCH 22/29] Updated RabbitMQAPI interface --- src/zocalo/cli/dlq_check.py | 12 +++++----- src/zocalo/cli/dlq_reinject.py | 10 +++------ tests/cli/test_dlq_check.py | 40 +++++++++++++++++++++------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/src/zocalo/cli/dlq_check.py b/src/zocalo/cli/dlq_check.py index 42fbe23..194af82 100644 --- a/src/zocalo/cli/dlq_check.py +++ b/src/zocalo/cli/dlq_check.py @@ -48,13 +48,13 @@ def extract_queue_name(namestring): def check_dlq_rabbitmq( zc: zocalo.configuration.Configuration, namespace: str = None ) -> dict: - rmq = RabbitMQAPI(zc) + rmq = RabbitMQAPI.from_zocalo_configuration(zc) return { - q["name"]: int(q["messages"]) - for q in rmq.queues - if q["name"].startswith("dlq.") - and (namespace is None or q["vhost"] == namespace) - and int(q["messages"]) + q.name: q.messages + for q in rmq.queues() + if q.name.startswith("dlq.") + and (namespace is None or q.vhost == namespace) + and q.messages } diff --git a/src/zocalo/cli/dlq_reinject.py b/src/zocalo/cli/dlq_reinject.py index ddcde0f..429f911 100644 --- a/src/zocalo/cli/dlq_reinject.py +++ b/src/zocalo/cli/dlq_reinject.py @@ -15,7 +15,7 @@ import workflows.transport import zocalo.configuration -from zocalo.util.rabbitmq import http_api_request +from zocalo.util.rabbitmq import RabbitMQAPI def run() -> None: @@ -70,6 +70,7 @@ def run() -> None: print("No DLQ message files given.") sys.exit(0) + rmqapi = RabbitMQAPI.from_zocalo_configuration(zc) transport.connect() first = True @@ -130,12 +131,7 @@ def run() -> None: header = dlqmsg["header"] exchange = header.get("headers", {}).get("x-death", {})[0].get("exchange") if exchange: - import urllib - - _api_request = http_api_request(zc, "/queues") - with urllib.request.urlopen(_api_request) as response: - reply = response.read() - exchange_info = json.loads(reply) + exchange_info = rmqapi.get("queues").json() for exch in exchange_info: if exch["name"] == exchange: if exch["type"] == "fanout": diff --git a/tests/cli/test_dlq_check.py b/tests/cli/test_dlq_check.py index f38138d..615b840 100644 --- a/tests/cli/test_dlq_check.py +++ b/tests/cli/test_dlq_check.py @@ -1,4 +1,3 @@ -import json from unittest import mock import zocalo.cli.dlq_check @@ -21,20 +20,31 @@ def test_activemq_dlq_check(mock_jmx): assert checked == {"images": 2, "transient": 5} -@mock.patch("zocalo.util.rabbitmq.urllib.request.urlopen") -@mock.patch("zocalo.util.rabbitmq.http_api_request") -def test_activemq_dlq_rabbitmq_check(mock_api, mock_url): - cfg = Configuration({}) - _mock = mock.MagicMock() - mock_api.return_value = "" - mock_url.return_value = _mock - mock_url.return_value.__enter__.return_value.read.return_value = json.dumps( - [ - {"name": "images", "vhost": "zocalo", "messages": 10}, - {"name": "dlq.images", "vhost": "zocalo", "messages": 2}, - {"name": "dlq.transient", "vhost": "zocalo", "messages": 5}, - ] +def test_activemq_dlq_rabbitmq_check(requests_mock): + zc = mock.Mock() + zc.rabbitmqapi = { + "base_url": "http://fake.com/api", + "username": "guest", + "password": "guest", + } + requests_mock.get( + "/api/queues", + json=[ + {"name": "images", "vhost": "zocalo", "messages": 10, "exclusive": False}, + { + "name": "dlq.images", + "vhost": "zocalo", + "messages": 2, + "exclusive": False, + }, + { + "name": "dlq.transient", + "vhost": "zocalo", + "messages": 5, + "exclusive": False, + }, + ], ) - checked = zocalo.cli.dlq_check.check_dlq_rabbitmq(cfg, "zocalo") + checked = zocalo.cli.dlq_check.check_dlq_rabbitmq(zc, "zocalo") assert checked == {"dlq.images": 2, "dlq.transient": 5} From d7197ce97ca62ac6e5af75f2dde6dbcf3fd26f91 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 22:48:00 +0100 Subject: [PATCH 23/29] Remove debugging code --- src/zocalo/util/rabbitmq.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 4fbec00..b65ba06 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -797,38 +797,3 @@ def delete_user(self, name: str): @delete_component.register def _delete_user(self, user: UserSpec): self.delete_user(name=user.name) - - -if __name__ == "__main__": - import time - - zc = zocalo.configuration.from_file() - zc.activate() - rmq = RabbitMQAPI.from_zocalo_configuration(zc) - print(rmq.users()) - print(rmq.users(name="guest")) - print(rmq.queues()) - print(rmq.queues(vhost="zocalo", name="processing_recipe")) - # time.sleep(5) - rmq.queue_declare( - vhost="zocalo", - queue=QueueSpec( - name="foo", auto_delete=True, arguments={"x-single-active-consumer": True} - ), - ) - time.sleep(5) - print(rmq.queues(vhost="zocalo", name="foo")) - rmq.queue_delete(vhost="zocalo", name="foo") - # print(rmq.queues(vhost="zocalo", name="foo")) - for q in rmq.queues(): - print(q.message_stats) - print() - for ex in rmq.exchanges(): - print(ex) - print(rmq.exchanges(vhost="zocalo", name="")) - - nodes = rmq.nodes() - print(rmq.nodes(name=nodes[0].name)) - - connections = rmq.connections() - print(rmq.connections(name=connections[0].name)) From 5b92239952b55a0d2d69b66e30fb4432f2b441c0 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 23:05:50 +0100 Subject: [PATCH 24/29] Add dependencies here too --- requirements_dev.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index 6a40bd6..eb0c2ec 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,9 +1,11 @@ PyYAML==6.0 graypy==2.1.0 marshmallow==3.14.0 +pydantic pytest-cov==3.0.0 pytest-mock pytest==6.2.5 +requests requests_mock setuptools==58.3.0 workflows==2.14 From e91b7516a3d4a868096060c7615daf9b0f8350e8 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 23:23:26 +0100 Subject: [PATCH 25/29] Move RabbitMQAPI instantiation in line with use This line isn't hit in the tests, and it isn't trivial to inject fake credentials via the test. --- src/zocalo/cli/dlq_reinject.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zocalo/cli/dlq_reinject.py b/src/zocalo/cli/dlq_reinject.py index 429f911..4cf5c32 100644 --- a/src/zocalo/cli/dlq_reinject.py +++ b/src/zocalo/cli/dlq_reinject.py @@ -70,7 +70,6 @@ def run() -> None: print("No DLQ message files given.") sys.exit(0) - rmqapi = RabbitMQAPI.from_zocalo_configuration(zc) transport.connect() first = True @@ -131,6 +130,7 @@ def run() -> None: header = dlqmsg["header"] exchange = header.get("headers", {}).get("x-death", {})[0].get("exchange") if exchange: + rmqapi = RabbitMQAPI.from_zocalo_configuration(zc) exchange_info = rmqapi.get("queues").json() for exch in exchange_info: if exch["name"] == exchange: From bca16dd3b7750edc1f2a7ff94808d30ec260897e Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 27 Oct 2021 23:28:04 +0100 Subject: [PATCH 26/29] Remove duplicate key --- tests/util/test_rabbitmq.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index cddd164..ccde31a 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -224,7 +224,6 @@ def test_api_exchange_declare(name, requests_mock, rmqapi): "type": "fanout", "auto_delete": True, "durable": True, - "auto_delete": True, } From 24ebf4c6707475384fc2503ca916ba59fdb8bc2d Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Thu, 28 Oct 2021 16:32:03 +0100 Subject: [PATCH 27/29] Too much history is a bad thing --- HISTORY.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index e54d24a..6c42a16 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -10,7 +10,6 @@ Unreleased specified in the Zocalo configuration * ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and places it back on a queue -* Use ``argparse`` for all command line tools and make use of ``workflows`` transport * Use ``argparse`` for all command line tools and make use of ``workflows`` transport argument injection. Minimum ``workflows`` version is now 2.14 * New ``zocalo.util.rabbitmq.RabbitMQAPI()`` providing a thin wrapper around the From 7c3e26b60019d950cc11bf9d7a53da38aa083fd4 Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 3 Nov 2021 14:18:43 +0000 Subject: [PATCH 28/29] Partial revert of ec0e219 functools.singledispatchmethod is Python 3.8+ only --- src/zocalo/util/rabbitmq.py | 33 ----------- tests/util/test_rabbitmq.py | 112 ++++++++++++++++-------------------- 2 files changed, 51 insertions(+), 94 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index b65ba06..b58b53d 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,6 +1,5 @@ import datetime import enum -import functools import logging import pathlib import urllib @@ -630,14 +629,6 @@ def delete(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Respo f"{self._url}/{endpoint}", auth=self._auth, params=params ) - @functools.singledispatchmethod - def create_component(self, component): - raise NotImplementedError(f"Component {component} not recognised") - - @functools.singledispatchmethod - def delete_component(self, component): - raise NotImplementedError(f"Component {component} not recognised") - def connections( self, name: Optional[str] = None ) -> Union[List[ConnectionInfo], ConnectionInfo]: @@ -677,7 +668,6 @@ def exchanges( logger.debug(response) return [ExchangeInfo(**qi) for qi in response.json()] - @create_component.register def exchange_declare(self, exchange: ExchangeSpec): endpoint = f"exchanges/{exchange.vhost}/{exchange.name}/" response = self.put( @@ -691,10 +681,6 @@ def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): response = self.delete(endpoint, params={"if_unused": if_unused}) logger.debug(response) - @delete_component.register - def _delete_exchange(self, exchange: ExchangeSpec, **kwargs): - self.exchange_delete(vhost=exchange.vhost, name=exchange.name, **kwargs) - def policies( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[PolicyInfo], PolicyInfo]: @@ -711,7 +697,6 @@ def policies( logger.debug(response) return [PolicyInfo(**p) for p in response.json()] - @create_component.register def set_policy(self, policy: PolicySpec): endpoint = f"policies/{policy.vhost}/{policy.name}/" response = self.put( @@ -727,10 +712,6 @@ def clear_policy(self, vhost: str, name: str): response = self.delete(endpoint) logger.debug(response) - @delete_component.register - def _delete_policy(self, policy: PolicySpec): - self.clear_policy(vhost=policy.vhost, name=policy.name) - def queues( self, vhost: Optional[str] = None, name: Optional[str] = None ) -> Union[List[QueueInfo], QueueInfo]: @@ -747,7 +728,6 @@ def queues( logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] - @create_component.register def queue_declare(self, queue: QueueSpec): endpoint = f"queues/{queue.vhost}/{queue.name}" response = self.put( @@ -764,14 +744,6 @@ def queue_delete( ) logger.debug(response) - @delete_component.register - def _delete_queue( - self, queue: QueueSpec, if_unused: bool = False, if_empty: bool = False - ): - self.queue_delete( - vhost=queue.vhost, name=queue.name, if_unused=if_unused, if_empty=if_empty - ) - def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: endpoint = "users" if name: @@ -781,7 +753,6 @@ def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: response = self.get(endpoint) return [UserInfo(**u) for u in response.json()] - @create_component.register def add_user(self, user: UserSpec): endpoint = f"users/{user.name}/" response = self.put( @@ -793,7 +764,3 @@ def delete_user(self, name: str): endpoint = f"users/{name}/" response = self.delete(endpoint) logger.debug(response) - - @delete_component.register - def _delete_user(self, user: UserSpec): - self.delete_user(name=user.name) diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index ccde31a..cfb23fd 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -109,27 +109,23 @@ def queue_spec(): def test_api_queue_declare(requests_mock, rmqapi, queue_spec): requests_mock.put("/api/queues/zocalo/foo") rmqapi.queue_declare(queue=queue_spec) - rmqapi.create_component(queue_spec) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "PUT" - assert history.url.endswith("/api/queues/zocalo/foo") - assert history.json() == { - "auto_delete": True, - "arguments": queue_spec.arguments, - } + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith("/api/queues/zocalo/foo") + assert history.json() == { + "auto_delete": True, + "arguments": queue_spec.arguments, + } def test_api_queue_delete(requests_mock, rmqapi, queue_spec): requests_mock.delete("/api/queues/zocalo/foo") rmqapi.queue_delete(vhost="zocalo", name="foo", if_unused=True, if_empty=True) - rmqapi.delete_component(queue_spec, if_unused=True, if_empty=True) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "DELETE" - assert history.url.endswith( - "/api/queues/zocalo/foo?if_unused=True&if_empty=True" - ) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/queues/zocalo/foo?if_unused=True&if_empty=True") def test_api_nodes(requests_mock, rmqapi): @@ -215,26 +211,24 @@ def exchange_spec(name): def test_api_exchange_declare(name, requests_mock, rmqapi): requests_mock.put(f"/api/exchanges/zocalo/{name}/") rmqapi.exchange_declare(exchange=exchange_spec(name)) - rmqapi.create_component(exchange_spec(name)) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "PUT" - assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") - assert history.json() == { - "type": "fanout", - "auto_delete": True, - "durable": True, - } + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") + assert history.json() == { + "type": "fanout", + "auto_delete": True, + "durable": True, + } def test_api_exchange_delete(requests_mock, rmqapi): requests_mock.delete("/api/exchanges/zocalo/foo") rmqapi.exchange_delete(vhost="zocalo", name="foo", if_unused=True) - rmqapi.delete_component(exchange_spec("foo"), if_unused=True) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "DELETE" - assert history.url.endswith("/api/exchanges/zocalo/foo?if_unused=True") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/exchanges/zocalo/foo?if_unused=True") def test_api_connections(requests_mock, rmqapi): @@ -298,26 +292,24 @@ def user_spec(): def test_api_add_user(requests_mock, rmqapi, user_spec): requests_mock.put(f"/api/users/{user_spec.name}/") rmqapi.add_user(user=user_spec) - rmqapi.create_component(user_spec) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "PUT" - assert history.url.endswith(f"/api/users/{user_spec.name}/") - assert history.json() == { - "password_hash": "guest", - "hashing_algorithm": "rabbit_password_hashing_sha256", - "tags": "administrator", - } + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/users/{user_spec.name}/") + assert history.json() == { + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } def test_api_delete_user(requests_mock, rmqapi, user_spec): requests_mock.delete("/api/users/guest/") rmqapi.delete_user(name="guest") - rmqapi.delete_component(user_spec) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "DELETE" - assert history.url.endswith("/api/users/guest/") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/users/guest/") def test_api_policies(requests_mock, rmqapi): @@ -359,23 +351,21 @@ def policy_spec(): def test_api_set_policy(requests_mock, rmqapi, policy_spec): requests_mock.put(f"/api/policies/foo/{policy_spec.name}/") rmqapi.set_policy(policy=policy_spec) - rmqapi.create_component(policy_spec) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "PUT" - assert history.url.endswith(f"/api/policies/foo/{policy_spec.name}/") - assert history.json() == { - "pattern": "^amq.", - "apply-to": "queues", - "definition": {"delivery-limit": 5}, - } + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/policies/foo/{policy_spec.name}/") + assert history.json() == { + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + } def test_api_clear_policy(requests_mock, rmqapi, policy_spec): requests_mock.delete("/api/policies/foo/bar/") rmqapi.clear_policy(vhost="foo", name="bar") - rmqapi.delete_component(policy_spec) - assert requests_mock.call_count == 2 - for history in requests_mock.request_history: - assert history.method == "DELETE" - assert history.url.endswith("/api/policies/foo/bar/") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/policies/foo/bar/") From e5171604b5cc08d4464838787bc2012d958e41af Mon Sep 17 00:00:00 2001 From: Richard Gildea Date: Wed, 3 Nov 2021 15:03:08 +0000 Subject: [PATCH 29/29] Remove logging of response to silence lgtm warnings --- src/zocalo/util/rabbitmq.py | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index b58b53d..d132aac 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -638,7 +638,6 @@ def connections( response = self.get(endpoint) return ConnectionInfo(**response.json()) response = self.get(endpoint) - logger.debug(response) return [ConnectionInfo(**qi) for qi in response.json()] def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: @@ -649,7 +648,6 @@ def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: response = self.get(endpoint) return NodeInfo(**response.json()) response = self.get(endpoint) - logger.debug(response) return [NodeInfo(**qi) for qi in response.json()] def exchanges( @@ -665,21 +663,18 @@ def exchanges( elif name is not None: raise ValueError("name can not be set without vhost") response = self.get(endpoint) - logger.debug(response) return [ExchangeInfo(**qi) for qi in response.json()] def exchange_declare(self, exchange: ExchangeSpec): endpoint = f"exchanges/{exchange.vhost}/{exchange.name}/" - response = self.put( + self.put( endpoint, json=exchange.dict(exclude_defaults=True, exclude={"name", "vhost"}), ) - logger.debug(response) def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): endpoint = f"exchanges/{vhost}/{name}" - response = self.delete(endpoint, params={"if_unused": if_unused}) - logger.debug(response) + self.delete(endpoint, params={"if_unused": if_unused}) def policies( self, vhost: Optional[str] = None, name: Optional[str] = None @@ -694,23 +689,20 @@ def policies( elif name is not None: raise ValueError("name can not be set without vhost") response = self.get(endpoint) - logger.debug(response) return [PolicyInfo(**p) for p in response.json()] def set_policy(self, policy: PolicySpec): endpoint = f"policies/{policy.vhost}/{policy.name}/" - response = self.put( + self.put( endpoint, json=policy.dict( exclude_defaults=True, exclude={"name", "vhost"}, by_alias=True ), ) - logger.debug(response) def clear_policy(self, vhost: str, name: str): endpoint = f"policies/{vhost}/{name}/" - response = self.delete(endpoint) - logger.debug(response) + self.delete(endpoint) def queues( self, vhost: Optional[str] = None, name: Optional[str] = None @@ -725,24 +717,19 @@ def queues( elif name is not None: raise ValueError("name can not be set without vhost") response = self.get(endpoint) - logger.debug(response) return [QueueInfo(**qi) for qi in response.json()] def queue_declare(self, queue: QueueSpec): endpoint = f"queues/{queue.vhost}/{queue.name}" - response = self.put( + self.put( endpoint, json=queue.dict(exclude_defaults=True, exclude={"name", "vhost"}) ) - logger.debug(response) def queue_delete( self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False ): endpoint = f"queues/{vhost}/{name}" - response = self.delete( - endpoint, params={"if_unused": if_unused, "if_empty": if_empty} - ) - logger.debug(response) + self.delete(endpoint, params={"if_unused": if_unused, "if_empty": if_empty}) def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: endpoint = "users" @@ -755,12 +742,8 @@ def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: def add_user(self, user: UserSpec): endpoint = f"users/{user.name}/" - response = self.put( - endpoint, json=user.dict(exclude_defaults=True, exclude={"name"}) - ) - logger.debug(response) + self.put(endpoint, json=user.dict(exclude_defaults=True, exclude={"name"})) def delete_user(self, name: str): endpoint = f"users/{name}/" - response = self.delete(endpoint) - logger.debug(response) + self.delete(endpoint)