Skip to content

Commit

Permalink
Add pgbouncer check and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DorianZaccaria committed Feb 24, 2015
1 parent 414c027 commit f569f5d
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 1 deletion.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ env:
- TRAVIS_FLAVOR=fluentd
- TRAVIS_FLAVOR=rabbitmq
- TRAVIS_FLAVOR=etcd
- TRAVIS_FLAVOR=pgbouncer

# Override travis defaults with empty jobs
before_install: echo "OVERRIDING TRAVIS STEPS"
Expand Down
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require './ci/memcache'
require './ci/mongo'
require './ci/mysql'
require './ci/nginx'
require './ci/pgbouncer'
require './ci/postgres'
require './ci/rabbitmq'
require './ci/redis'
Expand Down
167 changes: 167 additions & 0 deletions checks.d/pgbouncer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Pgbouncer check
Collects metrics from the pgbouncer database.
"""
from checks import AgentCheck, CheckException
from collections import OrderedDict

import psycopg2 as pg

class ShouldRestartException(Exception): pass

class PgBouncer(AgentCheck):
"""Collects metrics from pgbouncer
"""
SOURCE_TYPE_NAME = 'pgbouncer'
RATE = AgentCheck.rate
GAUGE = AgentCheck.gauge

STATS_METRICS = {
'descriptors': [
('database', 'db'),
],
'metrics': OrderedDict([
('total_requests', ('pgbouncer.stats.requests_per_second', RATE)),
('total_received', ('pgbouncer.stats.bytes_received_per_second', RATE)),
('total_sent', ('pgbouncer.stats.bytes_sent_per_second', RATE)),
('total_query_time', ('pgbouncer.stats.total_query_time', GAUGE)),
('avg_req', ('pgbouncer.stats.avg_req', GAUGE)),
('avg_recv', ('pgbouncer.stats.avg_recv', GAUGE)),
('avg_sent', ('pgbouncer.stats.avg_sent', GAUGE)),
('avg_query', ('pgbouncer.stats.avg_query', GAUGE)),
]),
'query': """SHOW STATS""",
}

POOLS_METRICS = {
'descriptors': [
('database', 'db'),
('user', 'user'),
],
'metrics': OrderedDict([
('cl_active', ('pgbouncer.pools.cl_active', GAUGE)),
('cl_waiting', ('pgbouncer.pools.cl_waiting', GAUGE)),
('sv_active', ('pgbouncer.pools.sv_active', GAUGE)),
('sv_idle', ('pgbouncer.pools.sv_idle', GAUGE)),
('sv_used', ('pgbouncer.pools.sv_used', GAUGE)),
('sv_tested', ('pgbouncer.pools.sv_tested', GAUGE)),
('sv_login', ('pgbouncer.pools.sv_login', GAUGE)),
('maxwait', ('pgbouncer.pools.maxwait', GAUGE)),
]),
'query': """SHOW POOLS""",
}

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.dbs = {}

def _collect_stats(self, key, db, instance_tags):
"""Query pgbouncer for various metrics
"""

metric_scope = (self.STATS_METRICS, self.POOLS_METRICS)

try:
cursor = db.cursor()
for scope in metric_scope:

cols = scope['metrics'].keys()

try:
query = scope['query']
self.log.debug("Running query: %s" % query)
cursor.execute(query)

results = cursor.fetchall()
except pg.Error, e:
self.log.warning("Not all metrics may be available: %s" % str(e))
continue

for row in results:
if row[0] == 'pgbouncer':
continue

desc = scope['descriptors']
assert len(row) == len(cols) + len(desc)

tags = [t for t in instance_tags]
tags += ["%s:%s" % (d[0][1], d[1]) for d in zip(desc, row[:len(desc)])]

values = zip([scope['metrics'][c] for c in cols], row[len(desc):])

[v[0][1](self, v[0][0], v[1], tags=tags) for v in values]

if not results:
self.warning('No results were found for query: "%s"' % query)

cursor.close()
except pg.Error, e:
self.log.error("Connection error: %s" % str(e))
raise ShouldRestartException

def _get_connection(self, key, host, port, user, password, dbname, use_cached=True):
"Get and memoize connections to instances"
if key in self.dbs and use_cached:
return self.dbs[key]

elif host != "" and user != "":
try:
service_check_tags = [
"host:%s" % host,
"port:%s" % port
]
if dbname:
service_check_tags.append("db:%s" % dbname)

if host == 'localhost' and password == '':
# Use ident method
connection = pg.connect("user=%s dbname=%s" % (user, dbname))
elif port != '':
connection = pg.connect(host=host, port=port, user=user,
password=password, database=dbname)
else:
connection = pg.connect(host=host, user=user, password=password,
database=dbname)
status = AgentCheck.OK
self.service_check('pgbouncer.can_connect', status, tags=service_check_tags)
self.log.debug('pgbouncer status: %s' % status)

except Exception:
status = AgentCheck.CRITICAL
self.service_check('pgbouncer.can_connect', status, tags=service_check_tags)
self.log.debug('pgbouncer status: %s' % status)
raise
else:
if not host:
raise CheckException("Please specify a PgBouncer host to connect to.")
elif not user:
raise CheckException("Please specify a user to connect to PgBouncer as.")

connection.set_isolation_level(pg.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
# connection.autocommit = True

self.dbs[key] = connection
return connection

def check(self, instance):
host = instance.get('host', '')
port = instance.get('port', '')
user = instance.get('username', '')
password = instance.get('password', '')
tags = instance.get('tags', [])
dbname = 'pgbouncer'

key = '%s:%s:%s' % (host, port, dbname)

if tags is None:
tags = []
else:
tags = list(set(tags))

try:
db = self._get_connection(key, host, port, user, password, dbname)
self._collect_stats(key, db, tags)
except ShouldRestartException:
self.log.info("Resetting the connection")
db = self._get_connection(key, host, port, user, password, dbname, use_cached=False)
self._collect_stats(key, db, tags)
77 changes: 77 additions & 0 deletions ci/pgbouncer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
require './ci/common'
require './ci/postgres'

def pgb_rootdir
"#{ENV['INTEGRATIONS_DIR']}/pgbouncer"
end


namespace :ci do
namespace :pgbouncer do |flavor|
task :before_install => ['ci:common:before_install']

task :install do
Rake::Task['ci:postgres:install'].invoke
unless Dir.exist? File.expand_path(pgb_rootdir)
sh %(curl -s -L\
-o $VOLATILE_DIR/pgbouncer-1.5.4.tar.gz\
http://pgfoundry.org/frs/download.php/3393/pgbouncer-1.5.4.tar.gz)
sh %(mkdir -p $VOLATILE_DIR/pgbouncer)
sh %(tar xzf $VOLATILE_DIR/pgbouncer-1.5.4.tar.gz\
-C $VOLATILE_DIR/pgbouncer --strip-components=1)
sh %(mkdir -p #{pgb_rootdir})
sh %(cd $VOLATILE_DIR/pgbouncer\
&& ./configure --prefix=#{pgb_rootdir}\
&& make\
&& cp pgbouncer #{pgb_rootdir})
end
end

task :before_script do
Rake::Task['ci:postgres:before_script'].invoke
sh %(cp $TRAVIS_BUILD_DIR/ci/resources/pgbouncer/pgbouncer.ini\
#{pgb_rootdir}/pgbouncer.ini)
sh %(cp $TRAVIS_BUILD_DIR/ci/resources/pgbouncer/users.txt\
#{pgb_rootdir}/users.txt)
sh %(#{pgb_rootdir}/pgbouncer -d #{pgb_rootdir}/pgbouncer.ini)
sh %(PGPASSWORD=datadog #{pg_rootdir}/bin/psql\
-p 15433 -U datadog -w\
-c "SELECT * FROM persons"\
datadog_test)
sleep_for 5
end

task :script do
this_provides = [
'pgbouncer'
]
Rake::Task['ci:common:run_tests'].invoke(this_provides)
end

task :cleanup do
sh %(rm -rf $VOLATILE_DIR/pgbouncer*)
sh %(killall pgbouncer)
Rake::Task['ci:postgres:cleanup'].invoke
end

task :execute do
exception = nil
begin
%w(before_install install before_script script).each do |t|
Rake::Task["#{flavor.scope.path}:#{t}"].invoke
end
rescue => e
exception = e
puts "Failed task: #{e.class} #{e.message}".red
end
if ENV['SKIP_CLEANUP']
puts 'Skipping cleanup, disposable environments are great'.yellow
else
puts 'Cleaning up'
Rake::Task["#{flavor.scope.path}:cleanup"].invoke
end
fail exception if exception
end

end
end
11 changes: 11 additions & 0 deletions ci/resources/pgbouncer/pgbouncer.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[databases]
datadog_test = host=127.0.0.1 port=15432 dbname=datadog_test

[pgbouncer]
listen_port = 15433
listen_addr = *
auth_type = md5
auth_file = embedded/pgbouncer/users.txt
admin_users = datadog
logfile = pgbouncer.log
pidfile = pgbouncer.pid
1 change: 1 addition & 0 deletions ci/resources/pgbouncer/users.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"datadog" "datadog"
10 changes: 10 additions & 0 deletions conf.d/pgbouncer.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
init_config:

instances:
# - host: localhost
# port: 15433
# username: my_username
# password: my_password
# tags:
# - optional_tag1
# - optional_tag2
3 changes: 2 additions & 1 deletion source-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ httplib2==0.9
kafka-python==0.9.0-9bed11db98387c0d9e456528130b330631dc50af
ntplib==0.3.2
pg8000==1.9.6
psycopg2==2.6
PyMySQL==0.6.1
python-memcached==1.53
pyyaml==3.11
Expand All @@ -12,4 +13,4 @@ requests==2.3.0
simplejson==3.3.3
snakebite==1.3.9
backports.ssl_match_hostname==3.4.0.2
tornado==3.2.2
tornado==3.2.2
66 changes: 66 additions & 0 deletions tests/test_pgbouncer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import unittest
from tests.common import load_check

from nose.plugins.attrib import attr

import time
from pprint import pprint

@attr(requires='pgbouncer')
class TestPgbouncer(unittest.TestCase):

def test_checks(self):

config = {
'instances': [
{
'host': 'localhost',
'port': 15433,
'username': 'datadog',
'password': 'datadog'
}
]
}
agentConfig = {
'version': '0.1',
'api_key': 'toto'
}

self.check = load_check('pgbouncer', config, agentConfig)

self.check.run()
metrics = self.check.get_metrics()
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.cl_active']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.cl_waiting']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.sv_active']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.sv_idle']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.sv_used']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.sv_tested']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.sv_login']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.pools.maxwait']) >= 1, pprint(metrics))

self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.total_query_time']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.avg_req']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.avg_recv']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.avg_sent']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.avg_query']) >= 1, pprint(metrics))
# Rate metrics, need 2 collection rounds
time.sleep(1)
self.check.run()
metrics = self.check.get_metrics()
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.requests_per_second']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.bytes_received_per_second']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'pgbouncer.stats.bytes_sent_per_second']) >= 1, pprint(metrics))

# Service checks
service_checks = self.check.get_service_checks()
service_checks_count = len(service_checks)
self.assertTrue(type(service_checks) == type([]))
self.assertTrue(service_checks_count > 0)
self.assertEquals(len([sc for sc in service_checks if sc['check'] == "pgbouncer.can_connect"]), 1, service_checks)
# Assert that all service checks have the proper tags: host, port and db
self.assertEquals(len([sc for sc in service_checks if "host:localhost" in sc['tags']]), service_checks_count, service_checks)
self.assertEquals(len([sc for sc in service_checks if "port:%s" % config['instances'][0]['port'] in sc['tags']]), service_checks_count, service_checks)

if __name__ == '__main__':
unittest.main()

0 comments on commit f569f5d

Please sign in to comment.