diff --git a/checks.d/elastic.py b/checks.d/elastic.py index 3824a54a18..c523b6134b 100644 --- a/checks.d/elastic.py +++ b/checks.d/elastic.py @@ -34,7 +34,7 @@ class ESCheck(AgentCheck): DEFAULT_TIMEOUT = 5 - METRICS = { # Metrics that are common to all Elasticsearch versions + STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), "elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"), @@ -115,6 +115,9 @@ class ESCheck(AgentCheck): "jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"), "jvm.threads.count": ("gauge", "jvm.threads.count"), "jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"), + } + + CLUSTER_HEALTH_METRICS = { "elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"), "elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"), "elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"), @@ -176,7 +179,7 @@ def check(self, instance): # Check ES version for this instance and define parameters # (URLs and metrics) accordingly version = self._get_es_version() - self._define_params(version) + self._define_params(version, self.curr_config.is_external) # Load stats data. stats_url = urlparse.urljoin(self.curr_config.url, self.STATS_URL) @@ -212,16 +215,21 @@ def _get_es_version(self): self.log.debug("Elasticsearch version is %s" % version) return version - def _define_params(self, version): + def _define_params(self, version, is_external): """ Define the set of URLs and METRICS to use depending on the running ES version. """ if version >= [0,90,10]: # ES versions 0.90.10 and above self.HEALTH_URL = "/_cluster/health?pretty=true" - self.STATS_URL = "/_nodes/_local/stats?all=true" self.NODES_URL = "/_nodes?network=true" + # For "external" clusters, we want to collect from all nodes. + if is_external: + self.STATS_URL = "/_nodes/stats?all=true" + else: + self.STATS_URL = "/_nodes/_local/stats?all=true" + additional_metrics = { "jvm.gc.collectors.young.count": ("gauge", "jvm.gc.collectors.young.collection_count"), "jvm.gc.collectors.young.collection_time": ("gauge", "jvm.gc.collectors.young.collection_time_in_millis", lambda v: float(v)/1000), @@ -242,7 +250,7 @@ def _define_params(self, version): "jvm.gc.collection_time": ("gauge", "jvm.gc.collection_time_in_millis", lambda v: float(v)/1000), } - self.METRICS.update(additional_metrics) + self.STATS_METRICS.update(additional_metrics) if version >= [0,90,5]: # ES versions 0.90.5 and above @@ -264,7 +272,7 @@ def _define_params(self, version): "elasticsearch.cache.filter.size": ("gauge", "indices.cache.filter_size_in_bytes"), } - self.METRICS.update(additional_metrics) + self.STATS_METRICS.update(additional_metrics) def _get_data(self, url): """ Hit a given URL and return the parsed json @@ -295,17 +303,22 @@ def _get_data(self, url): return resp.json() def _process_stats_data(self, data): + is_external = self.curr_config.is_external for node_name in data['nodes']: node_data = data['nodes'][node_name] # On newer version of ES it's "host" not "hostname" node_hostname = node_data.get('hostname', node_data.get('host', None)) - should_process = self.curr_config.is_external\ - or self.should_process_node(node_name, node_hostname) + should_process = is_external \ + or self.should_process_node(node_name, node_hostname) + + # Override the metric hostname if we're hitting an external cluster. + metric_hostname = node_hostname if is_external else None + if should_process: - for metric in self.METRICS: - desc = self.METRICS[metric] + for metric in self.STATS_METRICS: + desc = self.STATS_METRICS[metric] self._process_metric(node_data, metric, *desc, - tags=self.curr_config.tags) + tags=self.curr_config.tags, hostname=metric_hostname) def should_process_node(self, node_name, node_hostname): """ The node stats API will return stats for every node so we @@ -373,7 +386,8 @@ def _host_matches_node(self, primary_addrs): # Check the interface addresses against the primary address return primary_addrs in ips - def _process_metric(self, data, metric, xtype, path, xform=None, tags=None): + def _process_metric(self, data, metric, xtype, path, xform=None, + tags=None, hostname=None): """data: dictionary containing all the stats metric: datadog metric path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue @@ -391,9 +405,9 @@ def _process_metric(self, data, metric, xtype, path, xform=None, tags=None): if value is not None: if xform: value = xform(value) if xtype == "gauge": - self.gauge(metric, value, tags=tags) + self.gauge(metric, value, tags=tags, hostname=hostname) else: - self.rate(metric, value, tags=tags) + self.rate(metric, value, tags=tags, hostname=hostname) else: self._metric_not_found(metric, path) @@ -409,9 +423,9 @@ def _process_health_data(self, data): event = self._create_event(data['status']) self.event(event) - for metric in self.METRICS: + for metric in self.CLUSTER_HEALTH_METRICS: # metric description - desc = self.METRICS[metric] + desc = self.CLUSTER_HEALTH_METRICS[metric] self._process_metric(data, metric, *desc, tags=self.curr_config.tags) # Process the service check diff --git a/tests/test_elastic.py b/tests/test_elastic.py index 3626032fca..8baa011144 100644 --- a/tests/test_elastic.py +++ b/tests/test_elastic.py @@ -1,4 +1,5 @@ # stdlib +import socket import unittest # 3p @@ -36,11 +37,15 @@ def test_bad_config(self): def test_check(self): agentConfig = { 'version': '0.1', - 'api_key': 'toto' + 'api_key': 'toto', + 'hostname': 'agent-es-test' } conf = { - 'instances': [{'url': 'http://localhost:%s' % PORT}] + 'instances': [ + {'url': 'http://localhost:%s' % PORT}, + {'url': 'http://localhost:%s' % PORT, 'is_external': True} + ] } # Initialize the check from checks.d @@ -48,6 +53,7 @@ def test_check(self): self.check.check(conf['instances'][0]) r = self.check.get_metrics() + self.check.get_events() self.assertTrue(type(r) == type([])) self.assertTrue(len(r) > 0) @@ -86,4 +92,13 @@ def test_check(self): self.check.cluster_status[conf['instances'][0].get('url')] = "red" self.check.check(conf['instances'][0]) events = self.check.get_events() + self.check.get_metrics() self.assertEquals(len(events),1,events) + + # Check an "external" cluster + self.check.check(conf['instances'][1]) + r = self.check.get_metrics() + expected_hostname = socket.gethostname() + for m in r: + if m[0] not in self.check.CLUSTER_HEALTH_METRICS: + self.assertEquals(m[3]['hostname'], expected_hostname)