Skip to content

Commit

Permalink
Merge pull request #1747 from DataDog/yann/rebased-supervisor-regexp
Browse files Browse the repository at this point in the history
[supervisord] selct processes by regexp name match
  • Loading branch information
yannmh committed Jul 15, 2015
2 parents b19f251 + 2590549 commit 4251314
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 72 deletions.
69 changes: 42 additions & 27 deletions checks.d/supervisord.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# stdlib
from collections import defaultdict
import itertools
import re
import socket
import time
import xmlrpclib
Expand Down Expand Up @@ -53,45 +55,35 @@ def check(self, instance):
supe = self._connect(instance)
count_by_status = defaultdict(int)

# Grab process information
# Gather all process information
try:
proc_names = instance.get('proc_names')
if proc_names:
if not isinstance(proc_names, list) or not len(proc_names):
raise Exception("Empty or invalid proc_names.")
processes = []
for proc_name in proc_names:
try:
processes.append(supe.getProcessInfo(proc_name))
except xmlrpclib.Fault, e:
if e.faultCode == 10: # bad process name
self.warning('Process not found: %s' % proc_name)
else:
raise Exception('An error occurred while reading'
'process %s information: %s %s'
% (proc_name, e.faultCode, e.faultString))
else:
processes = supe.getAllProcessInfo()
processes = supe.getAllProcessInfo()
except xmlrpclib.Fault, error:
raise Exception(
'An error occurred while reading process information: %s %s'
% (error.faultCode, error.faultString)
)
except socket.error, e:
host = instance.get('host', DEFAULT_HOST)
port = instance.get('port', DEFAULT_PORT)
sock = instance.get('socket')
if sock is None:
msg = 'Cannot connect to http://%s:%s. ' \
'Make sure that supervisor is running and XML-RPC ' \
'inet interface is enabled.' % (host, port)
'Make sure supervisor is running and XML-RPC ' \
'inet interface is enabled.' % (host, port)
else:
msg = 'Cannot connect to %s. Make sure that supervisor ' \
'is running and that the socket file' \
' has the right permissions.' % sock
msg = 'Cannot connect to %s. Make sure sure supervisor ' \
'is running and socket is enabled and socket file' \
' has the right permissions.' % sock

self.service_check(SERVER_SERVICE_CHECK, AgentCheck.CRITICAL,
tags=server_service_check_tags,
message=msg)

raise Exception(msg)

except xmlrpclib.ProtocolError, e:
if e.errcode == 401: # authorization error
if e.errcode == 401: # authorization error
msg = 'Username or password to %s are incorrect.' % server_name
else:
msg = "An error occurred while connecting to %s: "\
Expand All @@ -102,13 +94,36 @@ def check(self, instance):
message=msg)
raise Exception(msg)


# If we're here, we were able to connect to the server
self.service_check(SERVER_SERVICE_CHECK, AgentCheck.OK,
tags=server_service_check_tags)
tags=server_service_check_tags)

# Filter monitored processes on configuration directives
proc_regex = instance.get('proc_regex', [])
if not isinstance(proc_regex, list):
raise Exception("Empty or invalid proc_regex.")

proc_names = instance.get('proc_names', [])
if not isinstance(proc_names, list):
raise Exception("Empty or invalid proc_names.")

# Collect information on each monitored process
monitored_processes = []

# monitor all processes if no filters were specified
if len(proc_regex) == 0 and len(proc_names) == 0:
monitored_processes = processes

for pattern, process in itertools.product(proc_regex, processes):
if re.match(pattern, process['name']) and process not in monitored_processes:
monitored_processes.append(process)

for process in processes:
if process['name'] in proc_names and process not in monitored_processes:
monitored_processes.append(process)

# Report service checks and uptime for each process
for proc in processes:
for proc in monitored_processes:
proc_name = proc['name']
tags = ['%s:%s' % (SERVER_TAG, server_name),
'%s:%s' % (PROCESS_TAG, proc_name)]
Expand Down
6 changes: 0 additions & 6 deletions ci/resources/supervisord/supervisord.yaml

This file was deleted.

10 changes: 4 additions & 6 deletions ci/supervisord.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ def supervisor_rootdir

task before_script: ['ci:common:before_script'] do
sh %(mkdir -p $VOLATILE_DIR/supervisor)
%w(supervisord.conf supervisord.yaml).each do |conf|
sh %(cp $TRAVIS_BUILD_DIR/ci/resources/supervisord/#{conf}\
$VOLATILE_DIR/supervisor/)
sh %(sed -i -- 's/VOLATILE_DIR/#{ENV['VOLATILE_DIR'].gsub '/', '\/'}/g'\
$VOLATILE_DIR/supervisor/#{conf})
end
sh %(cp $TRAVIS_BUILD_DIR/ci/resources/supervisord/supervisord.conf\
$VOLATILE_DIR/supervisor/)
sh %(sed -i -- 's/VOLATILE_DIR/#{ENV['VOLATILE_DIR'].gsub '/', '\/'}/g'\
$VOLATILE_DIR/supervisor/supervisord.conf)

3.times do |i|
sh %(cp $TRAVIS_BUILD_DIR/ci/resources/supervisord/program_#{i}.sh\
Expand Down
2 changes: 2 additions & 0 deletions conf.d/supervisord.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ instances:
# port: 9001 # Optional. Defaults to 9001. The port number.
# user: user # Optional. Required only if a username is configured.
# pass: pass # Optional. Required only if a password is configured.
# proc_regex: # Optional. Regex pattern[s] matching the names of processes to monitor
# - 'myprocess-\d\d$'
# proc_names: # Optional. The process to monitor within this supervisord instance.
# - apache2 # If not specified, the check will monitor all processes.
# - webapp
Expand Down
105 changes: 73 additions & 32 deletions tests/checks/integration/test_supervisord.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,89 @@
# stdlib
import os
from time import sleep
import unittest

# 3p
from nose.plugins.attrib import attr

# project
from tests.checks.common import get_check
from checks import AgentCheck
from tests.checks.common import AgentCheckTest

PROCESSES = ["program_0", "program_1", "program_2"]
STATUSES = ["down", "up", "unknown"]


@attr(requires='supervisord')
class TestSupervisordCheck(unittest.TestCase):
class TestSupervisordCheck(AgentCheckTest):
CHECK_NAME = 'supervisord'

SUPERVISORD_CONFIG = [{
'name': "travis",
'socket': "unix://{0}//supervisor.sock".format(os.environ['VOLATILE_DIR']),
}]

def test_travis_supervisord(self):
"""Integration test for supervisord check. Using a supervisord on Travis."""
BAD_SUPERVISORD_CONFIG = [{
'name': "travis",
'socket': "unix:///wrong/path/supervisor.sock",
'host': "http://127.0.0.1",
}]

# Load yaml config
config_str = open(os.environ['VOLATILE_DIR'] + '/supervisor/supervisord.yaml', 'r').read()
self.assertTrue(config_str is not None and len(config_str) > 0, msg=config_str)
# Supervisord should run 3 programs for 10, 20 and 30 seconds
# respectively.
# The following dictionnary shows the processes by state for each iteration.
PROCESSES_BY_STATE_BY_ITERATION = map(
lambda x: dict(up=PROCESSES[x:], down=PROCESSES[:x], unknown=[]),
range(4)
)

# init the check and get the instances
check, instances = get_check('supervisord', config_str)
self.assertTrue(check is not None, msg=check)
self.assertEquals(len(instances), 1)
def test_check(self):
"""
Run Supervisord check and assess coverage
"""
config = {'instances': self.SUPERVISORD_CONFIG}
instance_tags = ["supervisord_server:travis"]

# Supervisord should run 3 programs for 30, 60 and 90 seconds
# respectively. The tests below will ensure that the process count
# metric is reported correctly after (roughly) 10, 40, 70 and 100 seconds
for i in range(4):
try:
# Run the check
check.check(instances[0])
except Exception, e:
# Make sure that it ran successfully
self.assertTrue(False, msg=str(e))
else:
up, down = 0, 0
for name, timestamp, value, meta in check.get_metrics():
if name == 'supervisord.process.count':
if 'status:up' in meta['tags']:
up = value
elif 'status:down' in meta['tags']:
down = value
self.assertEquals(up, 3 - i)
self.assertEquals(down, i)
sleep(10)
# Run the check
self.run_check(config)

# Check metrics and service checks scoped by process
for proc in PROCESSES:
process_tags = instance_tags + ["supervisord_process:{0}".format(proc)]
process_status = AgentCheck.OK if proc in \
self.PROCESSES_BY_STATE_BY_ITERATION[i]['up'] else AgentCheck.CRITICAL

self.assertMetric("supervisord.process.uptime", tags=process_tags, count=1)
self.assertServiceCheck("supervisord.process.status", status=process_status,
tags=process_tags, count=1)
# Check instance metrics
for status in STATUSES:
status_tags = instance_tags + ["status:{0}".format(status)]
count_processes = len(self.PROCESSES_BY_STATE_BY_ITERATION[i][status])
self.assertMetric("supervisord.process.count", value=count_processes,
tags=status_tags, count=1)

# Check service checks
self.assertServiceCheck("supervisord.can_connect", status=AgentCheck.OK,
tags=instance_tags, count=1)

# Raises when COVERAGE=true and coverage < 100%
self.coverage_report()

# Sleep 10s to give enough time to processes to terminate
sleep(10)

def test_connection_falure(self):
"""
Service check reports connection failure
"""
config = {'instances': self.BAD_SUPERVISORD_CONFIG}
instance_tags = ["supervisord_server:travis"]

self.assertRaises(
Exception,
lambda: self.run_check(config)
)
self.assertServiceCheck("supervisord.can_connect", status=AgentCheck.CRITICAL,
tags=instance_tags, count=1)
self.coverage_report()
43 changes: 42 additions & 1 deletion tests/checks/mock/test_supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class TestSupervisordCheck(unittest.TestCase):
'host': 'invalid_host',
'port': 9009
}],
'error_message': """Cannot connect to http://invalid_host:9009. Make sure that supervisor is running and XML-RPC inet interface is enabled."""
'error_message': """Cannot connect to http://invalid_host:9009. Make sure supervisor is running and XML-RPC inet interface is enabled."""
}, {
'yaml': """
init_config:
Expand Down Expand Up @@ -187,6 +187,47 @@ class TestSupervisordCheck(unittest.TestCase):
'check': 'supervisord.process.status'
}]
}
}, {
'yaml': """
init_config:
instances:
- name: server0
host: localhost
port: 9001
proc_regex:
- '^mysq.$'
- invalid_process""",
'expected_instances': [{
'name': 'server0',
'host': 'localhost',
'port': 9001,
'proc_regex': ['^mysq.$', 'invalid_process']
}],
'expected_metrics': {
'server0': [
('supervisord.process.count', 1,
{'type': 'gauge', 'tags': ['supervisord_server:server0', 'status:up']}),
('supervisord.process.count', 0,
{'type': 'gauge', 'tags': ['supervisord_server:server0', 'status:down']}),
('supervisord.process.count', 0,
{'type': 'gauge', 'tags': ['supervisord_server:server0', 'status:unknown']}),
('supervisord.process.uptime', 125, {'type': 'gauge',
'tags': ['supervisord_server:server0',
'supervisord_process:mysql']})
]
},
'expected_service_checks': {
'server0': [{
'status': AgentCheck.OK,
'tags': ['supervisord_server:server0'],
'check': 'supervisord.can_connect',
}, {
'status': AgentCheck.OK,
'tags': ['supervisord_server:server0', 'supervisord_process:mysql'],
'check': 'supervisord.process.status'
}]
}
}]

def setUp(self):
Expand Down

0 comments on commit 4251314

Please sign in to comment.