Skip to content

Commit

Permalink
[metadata] revisit AgentCheck service_metadata
Browse files Browse the repository at this point in the history
`AgentCheck.service_metadata(metadata_name, value)` replaces `AgentCheck.svc_metadata(metadata_dict)` method.
Under the hood, metadata are stored in a list, and rolled-up to a dictionnary for every instance.
Usage:
```
self.service_metadata('foo', "bar")
```
  • Loading branch information
yannmh committed Jun 4, 2015
1 parent 0ce747c commit ef87bfa
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 53 deletions.
8 changes: 1 addition & 7 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,6 @@ def check(self, instance):
health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\
= self._define_params(version, config.is_external)

# Collect metadata
self._collect_metadata(version)

# Load stats data.
stats_url = urlparse.urljoin(config.url, stats_url)
stats_data = self._get_data(stats_url, config)
Expand Down Expand Up @@ -264,6 +261,7 @@ def _get_es_version(self, config):
)
version = [1, 0, 0]

self.service_metadata('version', version)
self.log.debug("Elasticsearch version is %s" % version)
return version

Expand Down Expand Up @@ -544,7 +542,3 @@ def _create_event(self, status, tags=None):
'event_object': hostname,
'tags': tags
}

def _collect_metadata(self, version):
metadata_dict = {'version': version}
self.svc_metadata(metadata_dict)
9 changes: 4 additions & 5 deletions checks.d/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ def _connect(self, host, port, mysql_sock, user, password, defaults_file):

return db

def _collect_metadata(self, db, host):
metadata_dict = {}
metadata_dict['version'] = self._get_version(db, host)
self.svc_metadata(metadata_dict)

def _collect_metrics(self, host, db, tags, options):
cursor = db.cursor()
cursor.execute("SHOW /*!50002 GLOBAL */ STATUS;")
Expand Down Expand Up @@ -196,6 +191,9 @@ def _collect_metrics(self, host, db, tags, options):
"SHOW SLAVE STATUS", db, tags=tags
)

def _collect_metadata(self, db, host):
self._get_version(db, host)

def _rate_or_gauge_statuses(self, statuses, dbResults, tags):
for status, metric in statuses.iteritems():
metric_name, metric_type = metric
Expand Down Expand Up @@ -248,6 +246,7 @@ def _get_version(self, db, host):
version = result[0].split('-')
version = version[0].split('.')
self.mysql_version[host] = version
self.service_metadata('version', ".".join(version))
return version

def _collect_scalar(self, key, dict):
Expand Down
8 changes: 1 addition & 7 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def _get_version(self, key, db):
version = result[0]
self.versions[key] = version

self.service_metadata('version', self.versions[key])
return self.versions[key]

def _is_above(self, key, db, version_to_compare):
Expand Down Expand Up @@ -351,11 +352,6 @@ def _get_replication_metrics(self, key, db):
metrics = self.replication_metrics.get(key)
return metrics

def _collect_metadata(self, key, db):
metadata_dict = {}
metadata_dict['version'] = self._get_version(key, db)
self.svc_metadata(metadata_dict)

def _collect_stats(self, key, db, instance_tags, relations, custom_metrics):
"""Query pg_stat_* for various metrics
If relations is not an empty list, gather per-relation metrics
Expand Down Expand Up @@ -591,12 +587,10 @@ def check(self, instance):
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)
self._collect_stats(key, db, tags, relations, custom_metrics)
self._collect_metadata(key, db)
except ShouldRestartException:
self.log.info("Resetting the connection")
db = self.get_connection(key, host, port, user, password, dbname, use_cached=False)
self._collect_stats(key, db, tags, relations, custom_metrics)
self._collect_metadata(key, db)

if db is not None:
service_check_tags = self._get_service_check_tags(host, port, dbname)
Expand Down
4 changes: 1 addition & 3 deletions checks.d/redisdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,5 @@ def check(self, instance):
self._check_slowlog(instance, custom_tags)

def _collect_metadata(self, info):
metadata_dict = {}
if info and 'redis_version' in info:
metadata_dict['version'] = info['redis_version']
self.svc_metadata(metadata_dict)
self.service_metadata('version', info['redis_version'])
62 changes: 40 additions & 22 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class UnknownValue(CheckException): pass
# and not this class. This class will be removed in a future version
# of the agent.
#==============================================================================


class Check(object):
"""
(Abstract) class for all checks with the ability to:
Expand All @@ -57,16 +59,14 @@ class Check(object):
* only log error messages once (instead of each time they occur)
"""


def __init__(self, logger):
# where to store samples, indexed by metric_name
# metric_name: {("sorted", "tags"): [(ts, value), (ts, value)],
# tuple(tags) are stored as a key since lists are not hashable
# None: [(ts, value), (ts, value)]}
# untagged values are indexed by None
self._sample_store = {}
self._counters = {} # metric_name: bool
self._counters = {} # metric_name: bool
self.logger = logger
try:
self.logger.addFilter(LaconicFilter())
Expand Down Expand Up @@ -316,7 +316,8 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.warnings = []
self.library_versions = None
self.last_collection_time = defaultdict(int)
self.service_metadata = []
self._instance_metadata = []
self.svc_metadata = []

def instance_count(self):
""" Return the number of instances that are configured for this check. """
Expand Down Expand Up @@ -475,13 +476,17 @@ def service_check(self, check_name, status, tags=None, timestamp=None,
hostname, check_run_id, message)
)

def svc_metadata(self, metadata):
def service_metadata(self, meta_name, value):
"""
Save metadata.
:param metadata: The service metadata dictionary
:param meta_name: metadata key name
:type meta_name: string
:param value: metadata value
:type value: string
"""
self.service_metadata.append(metadata)
self._instance_metadata.append((meta_name, str(value)))

def has_events(self):
"""
Expand Down Expand Up @@ -524,6 +529,13 @@ def get_service_checks(self):
self.service_checks = []
return service_checks

def _roll_up_instance_metadata(self):
"""
Concatenate and flush instance metadata.
"""
self.svc_metadata.append(dict((k, v) for (k, v) in self._instance_metadata))
self._instance_metadata = []

def get_service_metadata(self):
"""
Return a list of the metadata dictionaries saved by the check -if any-
Expand All @@ -532,8 +544,10 @@ def get_service_metadata(self):
@return the list of metadata saved by this check
@rtype list of metadata dicts
"""
service_metadata = self.service_metadata
self.service_metadata = []
if self._instance_metadata:
self._roll_up_instance_metadata()
service_metadata = self.svc_metadata
self.svc_metadata = []
return service_metadata

def has_warnings(self):
Expand Down Expand Up @@ -619,14 +633,18 @@ def run(self):
if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME:
try:
before = AgentCheck._collect_internal_stats()
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
except Exception: # It's fine if we can't collect stats for the run, just log and proceed
self.log.debug("Failed to collect Agent Stats before check {0}".format(self.name))

instance_statuses = []
for i, instance in enumerate(self.instances):
try:
min_collection_interval = instance.get('min_collection_interval',
self.init_config.get('min_collection_interval', self.DEFAULT_MIN_COLLECTION_INTERVAL))
min_collection_interval = instance.get(
'min_collection_interval', self.init_config.get(
'min_collection_interval',
self.DEFAULT_MIN_COLLECTION_INTERVAL
)
)
now = time.time()
if now - self.last_collection_time[i] < min_collection_interval:
self.log.debug("Not running instance #{0} of check {1} as it ran less than {2}s ago".format(i, self.name, min_collection_interval))
Expand All @@ -644,24 +662,24 @@ def run(self):
instance_check_stats = {'run_time': timeit.default_timer() - check_start_time}

if self.has_warnings():
instance_status = check_status.InstanceStatus(i,
check_status.STATUS_WARNING,
warnings=self.get_warnings(),
instance_check_stats=instance_check_stats
instance_status = check_status.InstanceStatus(
i, check_status.STATUS_WARNING,
warnings=self.get_warnings(), instance_check_stats=instance_check_stats
)
else:
instance_status = check_status.InstanceStatus(
i,
check_status.STATUS_OK,
i, check_status.STATUS_OK,
instance_check_stats=instance_check_stats
)
except Exception, e:
self.log.exception("Check '%s' instance #%s failed" % (self.name, i))
instance_status = check_status.InstanceStatus(i,
check_status.STATUS_ERROR,
error=str(e),
tb=traceback.format_exc()
instance_status = check_status.InstanceStatus(
i, check_status.STATUS_ERROR,
error=str(e), tb=traceback.format_exc()
)
finally:
self._roll_up_instance_metadata()

instance_statuses.append(instance_status)

if self.in_developer_mode and self.name != AGENT_METRICS_CHECK_NAME:
Expand Down
2 changes: 2 additions & 0 deletions checks/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def run_single_check(check, verbose=True):
current_check_metrics = check.get_metrics()
current_check_events = check.get_events()
current_service_checks = check.get_service_checks()
current_service_metadata = check.get_service_metadata()

check_stats = check._get_internal_profiling_stats()

Expand All @@ -560,6 +561,7 @@ def run_single_check(check, verbose=True):
print "Metrics: \n{0}".format(pprint.pformat(current_check_metrics))
print "Events: \n{0}".format(pprint.pformat(current_check_events))
print "Service Checks: \n{0}".format(pprint.pformat(current_service_checks))
print "Service Metadata: \n{0}".format(pprint.pformat(current_service_metadata))

except Exception:
log.exception("Error running check %s" % check.name)
Expand Down
2 changes: 2 additions & 0 deletions tests/checks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def run_check(self, config, agent_config=None, mocks=None, force_reload=False):
# ie the check edits the tags of the instance, problematic if
# run twice
self.check.check(copy.deepcopy(instance))
# FIXME: This should be called within the `run` method only
self.check._roll_up_instance_metadata()
except Exception, e:
# Catch error before re-raising it to be able to get service_checks
print "Exception {0} during check".format(e)
Expand Down
37 changes: 28 additions & 9 deletions tests/core/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,43 @@ class TestMetadata(unittest.TestCase):
Test the metadata collection logic.
"""
class FakeCheck(AgentCheck):
""" This check will generate metadata depending on the instance """
"""
This check will generate more or less metadata depending on the instance
"""
def check(self, instance):
metadata = instance.get('metadata')
if metadata:
self._collect_metadata()
if instance.get('metadata'):
self._collect_metadata(instance.get('more_meta'))

def _collect_metadata(self):
self.svc_metadata({'version': 1})
def _collect_metadata(self, more_meta):
self.service_metadata('foo', "bar")
if more_meta:
self.service_metadata('baz', "qux")

def test_hostname_metadata(self):
"""
Collect hostname metadata.
Collect hostname metadata
"""
c = Collector({"collect_instance_metadata": True}, None, {}, "foo")
metadata = c._get_hostname_metadata()
assert "hostname" in metadata
assert "socket-fqdn" in metadata
assert "socket-hostname" in metadata

def test_instance_metadata_rollup(self):
"""
Roll-up instance metadata
"""
config = {'instances': [{'metadata': True, 'more_meta': True}]}
instances = config.get('instances')
check = TestMetadata.FakeCheck("fake_check", config, {}, instances)
check.run()

service_metadata = check.get_service_metadata()
service_metadata_count = len(service_metadata)

self.assertEquals(service_metadata_count, 1)
self.assertEquals(service_metadata[0], {'foo': "bar", 'baz': "qux"})

def test_metadata_length(self):
"""
Fill up checks that do not generate any metadata
Expand All @@ -39,10 +57,11 @@ def test_metadata_length(self):
instances = config.get('instances')
check = TestMetadata.FakeCheck("fake_check", config, {}, instances)
check.run()

service_metadata = check.get_service_metadata()
service_metadata_count = len(service_metadata)

self.assertEquals(service_metadata_count, 3)
self.assertEquals(service_metadata[0], {'version': 1})
self.assertEquals(service_metadata[0], {'foo': "bar"})
self.assertEquals(service_metadata[1], {})
self.assertEquals(service_metadata[2], {'version': 1})
self.assertEquals(service_metadata[2], {'foo': "bar"})

0 comments on commit ef87bfa

Please sign in to comment.