Skip to content

New zocalo.util.rabbitmq.RabbitMQAPI #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ History
Unreleased
----------
* Add command line tools for handling dead-letter messages
* ``zocalo.dlq_check`` checks dead-letter queues for messages
* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory
specified in the Zocalo configuration
* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and
* ``zocalo.dlq_check`` checks dead-letter queues for messages
* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory
specified in the Zocalo configuration
* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and
places it back on a queue
* Use ``argparse`` for all command line tools and make use of ``workflows`` transport
* Use ``argparse`` for all command line tools and make use of ``workflows`` transport
* Use ``argparse`` for all command line tools and make use of ``workflows`` transport
argument injection. Minimum ``workflows`` version is now 2.14
* New ``zocalo.util.rabbitmq.RabbitMQAPI()`` providing a thin wrapper around the
RabbitMQ HTTP API

0.10.0 (2021-10-04)
-------------------
Expand Down
22 changes: 9 additions & 13 deletions src/zocalo/cli/dlq_check.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import argparse
import json
import urllib

import workflows.transport

import zocalo.configuration
from zocalo.util.jmxstats import JMXAPI
from zocalo.util.rabbitmq import http_api_request
from zocalo.util.rabbitmq import RabbitMQAPI

#
# zocalo.dlq_check
Expand Down Expand Up @@ -50,16 +48,14 @@ def extract_queue_name(namestring):
def check_dlq_rabbitmq(
zc: zocalo.configuration.Configuration, namespace: str = None
) -> dict:
_api_request = http_api_request(zc, "/queues")
with urllib.request.urlopen(_api_request) as response:
reply = response.read()
queue_info = json.loads(reply)
dlq_info = {}
for q in queue_info:
if q["name"].startswith("dlq."):
if (namespace is None or q["vhost"] == namespace) and int(q["messages"]):
dlq_info[q["name"]] = int(q["messages"])
return dlq_info
rmq = RabbitMQAPI(zc)
return {
q["name"]: int(q["messages"])
for q in rmq.queues
if q["name"].startswith("dlq.")
and (namespace is None or q["vhost"] == namespace)
and int(q["messages"])
}


def run() -> None:
Expand Down
56 changes: 56 additions & 0 deletions src/zocalo/util/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import json
import urllib
import urllib.request
from typing import Any, Dict, List, Tuple

from workflows.transport import pika_transport

import zocalo.configuration

Expand Down Expand Up @@ -33,3 +38,54 @@ def http_api_request(
opener = urllib.request.build_opener(handler)
urllib.request.install_opener(opener)
return urllib.request.Request(f"{zc.rabbitmqapi['base_url']}{api_path}")


class RabbitMQAPI:
def __init__(self, zc: zocalo.configuration.Configuration):
self._zc = zc

@property
def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]:
# https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.9.7/deps/rabbitmq_management/priv/www/api/index.html
HEALTH_CHECKS = {
"/health/checks/alarms",
"/health/checks/local-alarms",
"/health/checks/certificate-expiration/1/months",
f"/health/checks/port-listener/{pika_transport.PikaTransport.defaults['--rabbit-port']}",
# f"/health/checks/port-listener/1234",
"/health/checks/protocol-listener/amqp",
"/health/checks/virtual-hosts",
"/health/checks/node-is-mirror-sync-critical",
"/health/checks/node-is-quorum-critical",
}

success = {}
failure = {}
for health_check in HEALTH_CHECKS:
try:
with urllib.request.urlopen(
http_api_request(self._zc, health_check)
) as response:
success[health_check] = json.loads(response.read())
except urllib.error.HTTPError as e:
failure[health_check] = str(e)
return success, failure

@property
def connections(self) -> List[Dict[str, Any]]:
with urllib.request.urlopen(
http_api_request(self._zc, "/connections")
) as response:
return json.loads(response.read())

@property
def nodes(self) -> List[Dict[str, Any]]:
# https://www.rabbitmq.com/monitoring.html#node-metrics
with urllib.request.urlopen(http_api_request(self._zc, "/nodes")) as response:
return json.loads(response.read())

@property
def queues(self) -> List[Dict[str, Any]]:
# https://www.rabbitmq.com/monitoring.html#queue-metrics
with urllib.request.urlopen(http_api_request(self._zc, "/queues")) as response:
return json.loads(response.read())
4 changes: 2 additions & 2 deletions tests/cli/test_dlq_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def test_activemq_dlq_check(mock_jmx):
assert checked == {"images": 2, "transient": 5}


@mock.patch("zocalo.cli.dlq_check.urllib.request.urlopen")
@mock.patch("zocalo.cli.dlq_check.http_api_request")
@mock.patch("zocalo.util.rabbitmq.urllib.request.urlopen")
@mock.patch("zocalo.util.rabbitmq.http_api_request")
def test_activemq_dlq_rabbitmq_check(mock_api, mock_url):
cfg = Configuration({})
_mock = mock.MagicMock()
Expand Down
110 changes: 107 additions & 3 deletions tests/util/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,117 @@
import json
import urllib.request

import pytest

import zocalo.configuration
from zocalo.util.rabbitmq import http_api_request
from zocalo.util.rabbitmq import RabbitMQAPI, http_api_request


def test_http_api_request(mocker):
@pytest.fixture
def zocalo_configuration(mocker):
zc = mocker.MagicMock(zocalo.configuration.Configuration)
zc.rabbitmqapi = {
"base_url": "http://rabbitmq.burrow.com:12345/api",
"username": "carrots",
"password": "carrots",
}
request = http_api_request(zc, api_path="/queues")
return zc


def test_http_api_request(zocalo_configuration):
request = http_api_request(zocalo_configuration, api_path="/queues")
assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues"


def test_api_health_checks(mocker, zocalo_configuration):
mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request")
mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen")
mock_api.return_value = ""
mock_url.return_value = mocker.MagicMock()
mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(
{"status": "ok"}
)
rmq = RabbitMQAPI(zocalo_configuration)
success, failures = rmq.health_checks
assert not failures
assert success
for k, v in success.items():
assert k.startswith("/health/checks/")
assert v == {"status": "ok"}


def test_api_health_checks_failures(mocker, zocalo_configuration):
mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request")
mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen")
mock_api.return_value = ""
mock_url.return_value = mocker.MagicMock()
mock_url.return_value.__enter__.return_value.read.side_effect = (
urllib.error.HTTPError(
"http://foo.com", 503, "Service Unavailable", mocker.Mock(), mocker.Mock()
)
)
rmq = RabbitMQAPI(zocalo_configuration)
success, failures = rmq.health_checks
assert failures
assert not success
for k, v in success.items():
assert k.startswith("/health/checks/")
assert v == "HTTP Error 503: Service Unavailable"


def test_api_queues(mocker, zocalo_configuration):
queues = [
{
"consumers": 0,
"memory": 110112,
"message_stats": {
"deliver_get": 33,
"deliver_get_details": {"rate": 0},
"publish": 22,
"publish_details": {"rate": 0},
},
"messages": 0,
"messages_ready": 0,
"messages_unacknowledged": 0,
"name": "foo",
"vhost": "zocalo",
},
]

mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request")
mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen")
mock_api.return_value = ""
mock_url.return_value = mocker.MagicMock()
mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(queues)
rmq = RabbitMQAPI(zocalo_configuration)
assert rmq.queues == queues


def test_api_nodes(mocker, zocalo_configuration):
nodes = {
"name": "rabbit@pooter123",
"mem_limit": 80861855744,
"mem_alarm": False,
"mem_used": 143544320,
"disk_free_limit": 50000000,
"disk_free_alarm": False,
"disk_free": 875837644800,
"fd_total": 32768,
"fd_used": 56,
"io_file_handle_open_attempt_count": 647,
"sockets_total": 29401,
"sockets_used": 0,
"gc_num": 153378077,
"gc_bytes_reclaimed": 7998215046336,
"proc_total": 1048576,
"proc_used": 590,
"run_queue": 1,
}

mock_api = mocker.patch("zocalo.util.rabbitmq.http_api_request")
mock_url = mocker.patch("zocalo.util.rabbitmq.urllib.request.urlopen")
mock_api.return_value = ""
mock_url.return_value = mocker.MagicMock()
mock_url.return_value.__enter__.return_value.read.return_value = json.dumps(nodes)
rmq = RabbitMQAPI(zocalo_configuration)
assert rmq.queues == nodes