Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rabbitmq] Improve test suite and fix tagging #1472

Merged
merged 2 commits into from
Mar 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 61 additions & 42 deletions checks.d/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
NODE_TYPE = 'nodes'
MAX_DETAILED_QUEUES = 200
MAX_DETAILED_NODES = 100
ALERT_THRESHOLD = 0.9 # Post an event in the stream when the number of queues or nodes to collect is above 90% of the limit:
# Post an event in the stream when the number of queues or nodes to
# collect is above 90% of the limit:
ALERT_THRESHOLD = 0.9
QUEUE_ATTRIBUTES = [
# Path, Name
('active_consumers', 'active_consumers'),
Expand Down Expand Up @@ -65,13 +67,13 @@

TAGS_MAP = {
QUEUE_TYPE: {
'node':'node',
'name':'queue',
'vhost':'vhost',
'policy':'policy',
},
'node': 'node',
'name': 'queue',
'vhost': 'vhost',
'policy': 'policy',
},
NODE_TYPE: {
'name':'node',
'name': 'node',
}
}

Expand All @@ -82,6 +84,7 @@


class RabbitMQ(AgentCheck):

"""This check is for gathering statistics from the RabbitMQ
Management Plugin (http://www.rabbitmq.com/management.html)
"""
Expand Down Expand Up @@ -123,19 +126,21 @@ def _get_config(self, instance):
for object_type, filters in specified.iteritems():
for filter_type, filter_objects in filters.iteritems():
if type(filter_objects) != list:
raise TypeError("{0} / {0}_regexes parameter must be a list".format(object_type))
raise TypeError(
"{0} / {0}_regexes parameter must be a list".format(object_type))

auth = (username, password)

return base_url, max_detailed, specified, auth


def check(self, instance):
base_url, max_detailed, specified, auth = self._get_config(instance)

# Generate metrics from the status API.
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE], auth=auth)
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE], auth=auth)
self.get_stats(instance, base_url, QUEUE_TYPE, max_detailed[
QUEUE_TYPE], specified[QUEUE_TYPE], auth=auth)
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[
NODE_TYPE], specified[NODE_TYPE], auth=auth)

# Generate a service check from the aliveness API.
vhosts = instance.get('vhosts')
Expand All @@ -147,12 +152,13 @@ def _get_data(self, url, auth=None):
r.raise_for_status()
data = r.json()
except requests.exceptions.HTTPError as e:
raise Exception('Cannot open RabbitMQ API url: %s %s' % (url, str(e)))
raise Exception(
'Cannot open RabbitMQ API url: %s %s' % (url, str(e)))
except ValueError, e:
raise Exception('Cannot parse JSON response from API url: %s %s' % (url, str(e)))
raise Exception(
'Cannot parse JSON response from API url: %s %s' % (url, str(e)))
return data


def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth=None):
"""
instance: the check instance
Expand All @@ -162,8 +168,11 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth
filters: explicit or regexes filters of specified queues or nodes (specified in the yaml file)
"""

data = self._get_data(urlparse.urljoin(base_url, object_type), auth=auth)
explicit_filters = list(filters['explicit']) # Make a copy of this list as we will remove items from it at each iteration
data = self._get_data(
urlparse.urljoin(base_url, object_type), auth=auth)
# Make a copy of this list as we will remove items from it at each
# iteration
explicit_filters = list(filters['explicit'])
regex_filters = filters['regexes']

""" data is a list of nodes or queues:
Expand All @@ -175,9 +184,11 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth
]
"""
if len(explicit_filters) > max_detailed:
raise Exception("The maximum number of %s you can specify is %d." % (object_type, max_detailed))
raise Exception(
"The maximum number of %s you can specify is %d." % (object_type, max_detailed))

if explicit_filters or regex_filters: # a list of queues/nodes is specified. We process only those
# a list of queues/nodes is specified. We process only those
if explicit_filters or regex_filters:
matching_lines = []
for data_line in data:
name = data_line.get("name")
Expand All @@ -193,10 +204,12 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth
match_found = True
break

if match_found: continue
if match_found:
continue

# Absolute names work only for queues
if object_type != QUEUE_TYPE: continue
if object_type != QUEUE_TYPE:
continue
absolute_name = '%s/%s' % (data_line.get("vhost"), name)
if absolute_name in explicit_filters:
matching_lines.append(data_line)
Expand All @@ -209,7 +222,8 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth
match_found = True
break

if match_found: continue
if match_found:
continue

data = matching_lines

Expand All @@ -220,19 +234,20 @@ def get_stats(self, instance, base_url, object_type, max_detailed, filters, auth

if len(data) > max_detailed:
# Display a warning in the info page
self.warning("Too many queues to fetch. You must choose the %s you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support" % object_type)
self.warning(
"Too many queues to fetch. You must choose the %s you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support" % object_type)

for data_line in data[:max_detailed]:
# We truncate the list of nodes/queues if it's above the limit
self._get_metrics(data_line, object_type)


def _get_metrics(self, data, object_type):
tags = []
tag_list = TAGS_MAP[object_type]
for t in tag_list.keys():
tag = data.get(t, None)
if tag is not None:
for t in tag_list:
tag = data.get(t)
if tag:
# FIXME 6.x: remove this suffix or unify (sc doesn't have it)
tags.append('rabbitmq_%s:%s' % (tag_list[t], tag))

for attribute, metric_name in ATTRIBUTES[object_type]:
Expand All @@ -245,9 +260,11 @@ def _get_metrics(self, data, object_type):
value = root.get(keys[-1], None)
if value is not None:
try:
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], metric_name), float(value), tags=tags)
self.gauge('rabbitmq.%s.%s' % (
METRIC_SUFFIX[object_type], metric_name), float(value), tags=tags)
except ValueError:
self.log.debug("Caught ValueError for %s %s = %s with tags: %s" % (METRIC_SUFFIX[object_type], attribute, value, tags))
self.log.debug("Caught ValueError for %s %s = %s with tags: %s" % (
METRIC_SUFFIX[object_type], attribute, value, tags))

def alert(self, base_url, max_detailed, size, object_type):
key = "%s%s" % (base_url, object_type)
Expand All @@ -257,21 +274,22 @@ def alert(self, base_url, max_detailed, size, object_type):

self.already_alerted.append(key)

title = "RabbitMQ integration is approaching the limit on the number of %s that can be collected from on %s" % (object_type, self.hostname)
title = "RabbitMQ integration is approaching the limit on the number of %s that can be collected from on %s" % (
object_type, self.hostname)
msg = """%s %s are present. The limit is %s.
Please get in touch with Datadog support to increase the limit.""" % (size, object_type, max_detailed)

event = {
"timestamp": int(time.time()),
"event_type": EVENT_TYPE,
"msg_title": title,
"msg_text": msg,
"alert_type": 'warning',
"source_type_name": SOURCE_TYPE_NAME,
"host": self.hostname,
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname],
"event_object": "rabbitmq.limit.%s" % object_type,
}
"timestamp": int(time.time()),
"event_type": EVENT_TYPE,
"msg_title": title,
"msg_text": msg,
"alert_type": 'warning',
"source_type_name": SOURCE_TYPE_NAME,
"host": self.hostname,
"tags": ["base_url:%s" % base_url, "host:%s" % self.hostname],
"event_object": "rabbitmq.limit.%s" % object_type,
}

self.event(event)

Expand Down Expand Up @@ -304,7 +322,8 @@ def _check_aliveness(self, base_url, vhosts=None, auth=None):
except Exception as e:
# Either we got a bad status code or unparseable JSON.
status = AgentCheck.CRITICAL
self.warning('Error when checking aliveness for vhost %s: %s'\
% (vhost, str(e)))
self.warning('Error when checking aliveness for vhost %s: %s'
% (vhost, str(e)))

self.service_check('rabbitmq.aliveness', status, tags, message=message)
self.service_check(
'rabbitmq.aliveness', status, tags, message=message)
7 changes: 4 additions & 3 deletions ci/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ def rabbitmq_rootdir
sleep_for 5
sh %(#{rabbitmq_rootdir}/sbin/rabbitmq-plugins enable rabbitmq_management)
sh %(#{rabbitmq_rootdir}/sbin/rabbitmq-plugins enable rabbitmq_management)
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` declare queue name=test1)
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` declare queue name=test5)
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` declare queue name=tralala)
%w(test1 test5 tralala).each do |q|
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` declare queue name=#{q})
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` publish exchange=amq.default routing_key=#{q} payload="hello, world")
end
sh %(python `find #{rabbitmq_rootdir} -name rabbitmqadmin` list queues)
end

Expand Down
28 changes: 28 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,34 @@ def assertServiceCheck(self, service_check_name, status=None, tags=None, count=N
sc['tested'] = True
log.debug("FOUND !")

def assertServiceCheckOK(self, service_check_name, tags=None, count=None, at_least=1):
self.assertServiceCheck(service_check_name,
status=AgentCheck.OK,
tags=tags,
count=count,
at_least=at_least)

def assertServiceCheckWarning(self, service_check_name, tags=None, count=None, at_least=1):
self.assertServiceCheck(service_check_name,
status=AgentCheck.WARNING,
tags=tags,
count=count,
at_least=at_least)

def assertServiceCheckCritical(self, service_check_name, tags=None, count=None, at_least=1):
self.assertServiceCheck(service_check_name,
status=AgentCheck.CRITICAL,
tags=tags,
count=count,
at_least=at_least)

def assertServiceCheckUnknown(self, service_check_name, tags=None, count=None, at_least=1):
self.assertServiceCheck(service_check_name,
status=AgentCheck.UNKNOWN,
tags=tags,
count=count,
at_least=at_least)

def assertIn(self, first, second):
self.assertTrue(first in second, "{0} not in {1}".format(first, second))

Expand Down
68 changes: 35 additions & 33 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
]
}

COMMON_METRICS = [
'rabbitmq.node.fd_used',
'rabbitmq.node.mem_used',
'rabbitmq.node.run_queue',
'rabbitmq.node.sockets_used',
]


@attr(requires='rabbitmq')
class RabbitMQCheckTest(AgentCheckTest):
Expand All @@ -37,15 +44,13 @@ def test_check(self):
self.run_check(CONFIG)

# Node attributes
self.assertMetric('rabbitmq.node.fd_used')
self.assertMetric('rabbitmq.node.mem_used')
self.assertMetric('rabbitmq.node.run_queue')
self.assertMetric('rabbitmq.node.sockets_used')
self.assertMetricTagPrefix('rabbitmq.node.fd_used', 'rabbitmq_node')
for mname in COMMON_METRICS:
self.assertMetricTagPrefix(mname, 'rabbitmq_node', count=1)

# Queue attributes, should be only one queue fetched
# TODO: create a 'fake consumer' and get missing metrics
# active_consumers, acks, delivers, redelivers
Q_METRICS = [
# 'active_consumers', # no active consumers in this test..
'consumers',
'memory',
'messages',
Expand All @@ -54,26 +59,25 @@ def test_check(self):
'messages_ready.rate',
'messages_unacknowledged',
'messages_unacknowledged.rate',
# Not available right now b/c of the way we configure rabbitmq on Travis
# 'messages.ack.count',
# 'messages.ack.rate',
# 'messages.deliver.count',
# 'messages.deliver.rate',
# 'messages.deliver_get.count',
# 'messages.deliver_get.rate',
# 'messages.publish.count',
# 'messages.publish.rate',
# 'messages.redeliver.count',
# 'messages.redeliver.rate',
'messages.publish.count',
'messages.publish.rate',
]
for mname in Q_METRICS:
self.assertMetricTag('rabbitmq.queue.%s' % mname, 'rabbitmq_queue:test1', count=1)
self.assertMetricTag('rabbitmq.queue.%s' %
mname, 'rabbitmq_queue:test1', count=1)

self.assertServiceCheckOK('rabbitmq.aliveness', tags=['vhost:/'])

self.coverage_report()

def test_queue_regex(self):
self.run_check(CONFIG_REGEX)

# Node attributes
for mname in COMMON_METRICS:
self.assertMetricTagPrefix(mname, 'rabbitmq_node', count=1)

Q_METRICS = [
# 'active_consumers', # no active consumers in this test..
'consumers',
'memory',
'messages',
Expand All @@ -82,19 +86,17 @@ def test_queue_regex(self):
'messages_ready.rate',
'messages_unacknowledged',
'messages_unacknowledged.rate',
# Not available right now b/c of the way we configure rabbitmq on Travis
# 'messages.ack.count',
# 'messages.ack.rate',
# 'messages.deliver.count',
# 'messages.deliver.rate',
# 'messages.deliver_get.count',
# 'messages.deliver_get.rate',
# 'messages.publish.count',
# 'messages.publish.rate',
# 'messages.redeliver.count',
# 'messages.redeliver.rate',
'messages.publish.count',
'messages.publish.rate',
]
for mname in Q_METRICS:
self.assertMetricTag('rabbitmq.queue.%s' % mname, 'rabbitmq_queue:test1', count=1)
self.assertMetricTag('rabbitmq.queue.%s' % mname, 'rabbitmq_queue:test5', count=1)
self.assertMetricTag('rabbitmq.queue.%s' % mname, 'rabbitmq_queue:tralala', count=0)
self.assertMetricTag('rabbitmq.queue.%s' %
mname, 'rabbitmq_queue:test1', count=1)
self.assertMetricTag('rabbitmq.queue.%s' %
mname, 'rabbitmq_queue:test5', count=1)
self.assertMetricTag('rabbitmq.queue.%s' %
mname, 'rabbitmq_queue:tralala', count=0)

self.assertServiceCheckOK('rabbitmq.aliveness', tags=['vhost:/'])

self.coverage_report()