From ccc9227ac7211a7902958115a47b2d67b20ac55c Mon Sep 17 00:00:00 2001 From: Etienne LAFARGE Date: Thu, 27 Aug 2015 14:52:58 -0400 Subject: [PATCH 1/3] Revert "Merge pull request #1752 from DataDog/etienne/es-shard-level-metrics" This reverts commit b19f2515fa2ac542c29464732e3b6d5b5ba9b652, reversing changes made to 3e6c0b10e264044aec932c3b40e5ab6b1144f1f3. This reverts BOTH the addition of shard-level metrics to Elasticsearch as well as the addition of PShard-related statistics. Pshard statistics will be readded for the 5.5 release, shard level statistics will be discussed for 5.6.0 because right now, they obviously create way to much contexts in our backend. --- checks.d/elastic.py | 164 +----------------- conf.d/elastic.yaml.example | 27 +-- tests/checks/integration/test_elastic.py | 201 ++--------------------- 3 files changed, 25 insertions(+), 367 deletions(-) diff --git a/checks.d/elastic.py b/checks.d/elastic.py index e792ef382e..881b7472bf 100644 --- a/checks.d/elastic.py +++ b/checks.d/elastic.py @@ -18,8 +18,6 @@ class NodeNotFound(Exception): ESInstanceConfig = namedtuple( 'ESInstanceConfig', [ - 'shard_level_metrics', - 'pshard_stats', 'cluster_stats', 'password', 'service_check_tags', @@ -33,59 +31,9 @@ class NodeNotFound(Exception): class ESCheck(AgentCheck): SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect' SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health' - SERVICE_CHECK_SHARD_STATE = 'elasticsearch.shard.state' DEFAULT_TIMEOUT = 5 - # Pre aggregated metrics concerning only primary shards - # Those dicts are populated in the ESCheck constructor - PRIMARY_SHARD_METRICS = {} - PRIMARY_SHARD_METRICS_POST_1_0 = {} - - # Same kind of dicts for SHARD_LEVEL_METRICS - SHARD_LEVEL_METRICS = {} - SHARD_LEVEL_METRICS_POST_1_0 = {} - - # Shard-specific metrics - SHARD_LEVEL_METRICS_SUFFIX = { - ".docs.count": ("gauge", "docs.count"), - ".docs.deleted": ("gauge", "docs.deleted"), - ".store.size": ("gauge", "store.size_in_bytes"), - ".indexing.index.total": ("gauge", "indexing.index_total"), - ".indexing.index.time": ("gauge", "indexing.index_time_in_millis", lambda v: float(v)/1000), - ".indexing.index.current": ("gauge", "indexing.index_current"), - ".indexing.delete.total": ("gauge", "indexing.delete_total"), - ".indexing.delete.time": ("gauge", "indexing.delete_time_in_millis", lambda v: float(v)/1000), - ".indexing.delete.current": ("gauge", "indexing.delete_current"), - ".get.total": ("gauge", "get.total"), - ".get.time": ("gauge", "get.time_in_millis", lambda v: float(v)/1000), - ".get.current": ("gauge", "get.current"), - ".get.exists.total": ("gauge", "get.exists_total"), - ".get.exists.time": ("gauge", "get.exists_time_in_millis", lambda v: float(v)/1000), - ".get.missing.total": ("gauge", "get.missing_total"), - ".get.missing.time": ("gauge", "get.missing_time_in_millis", lambda v: float(v)/1000), - ".search.query.total": ("gauge", "search.query_total"), - ".search.query.time": ("gauge", "search.query_time_in_millis", lambda v: float(v)/1000), - ".search.query.current": ("gauge", "search.query_current"), - ".search.fetch.total": ("gauge", "search.fetch_total"), - ".search.fetch.time": ("gauge", "search.fetch_time_in_millis", lambda v: float(v)/1000), - ".search.fetch.current": ("gauge", "search.fetch_current") - } - - SHARD_LEVEL_METRICS_POST_1_0_SUFFIX = { - ".merges.current": ("gauge", "merges.current"), - ".merges.current.docs": ("gauge", "merges.current_docs"), - ".merges.current.size": ("gauge", "merges.current_size_in_bytes"), - ".merges.total": ("gauge", "merges.total"), - ".merges.total.time": ("gauge", "merges.total_time_in_millis", lambda v: float(v)/1000), - ".merges.total.docs": ("gauge", "merges.total_docs"), - ".merges.total.size": ("gauge", "merges.total_size_in_bytes"), - ".refresh.total": ("gauge", "refresh.total"), - ".refresh.total.time": ("gauge", "refresh.total_time_in_millis", lambda v: float(v)/1000), - ".flush.total": ("gauge", "flush.total"), - ".flush.total.time": ("gauge", "flush.total_time_in_millis", lambda v: float(v)/1000) - } - STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), @@ -224,18 +172,6 @@ class ESCheck(AgentCheck): SOURCE_TYPE_NAME = 'elasticsearch' def __init__(self, name, init_config, agentConfig, instances=None): - - # Let's construct the PRIMARY_SHARD_METRICS and SHARD_LEVEL_METRICS dicts - for k, v in self.SHARD_LEVEL_METRICS_SUFFIX.iteritems(): - val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None) - self.PRIMARY_SHARD_METRICS['elasticsearch.primaries{0}'.format(k)] = val - self.SHARD_LEVEL_METRICS['elasticsearch.shard{0}'.format(k)] = v - - for k, v in self.SHARD_LEVEL_METRICS_POST_1_0_SUFFIX.iteritems(): - val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None) - self.PRIMARY_SHARD_METRICS_POST_1_0['elasticsearch.primaries{0}'.format(k)] = val - self.SHARD_LEVEL_METRICS_POST_1_0['elasticsearch.shard{0}'.format(k)] = v - AgentCheck.__init__(self, name, init_config, agentConfig, instances) # Host status needs to persist across all checks @@ -246,9 +182,6 @@ def get_instance_config(self, instance): if url is None: raise Exception("An url must be specified in the instance") - pshard_stats = _is_affirmative(instance.get('pshard_stats', False)) - shard_level_metrics = _is_affirmative(instance.get('shard_level_metrics', False)) - cluster_stats = _is_affirmative(instance.get('cluster_stats', False)) if 'is_external' in instance: cluster_stats = _is_affirmative(instance.get('is_external', False)) @@ -273,8 +206,6 @@ def get_instance_config(self, instance): timeout = instance.get('timeout') or self.DEFAULT_TIMEOUT config = ESInstanceConfig( - shard_level_metrics=shard_level_metrics, - pshard_stats=pshard_stats, cluster_stats=cluster_stats, password=instance.get('password'), service_check_tags=service_check_tags, @@ -292,21 +223,8 @@ def check(self, instance): # (URLs and metrics) accordingly version = self._get_es_version(config) - health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, stats_metrics, \ - pshard_stats_metrics, shard_level_metrics = self._define_params(version, - config.cluster_stats) - - # Load primary shards data - if config.pshard_stats: - pshard_stats_url = urlparse.urljoin(config.url, pshard_stats_url) - pshard_stats_data = self._get_data(pshard_stats_url, config) - self._process_pshard_stats_data(pshard_stats_data, config, pshard_stats_metrics) - - # Load shard-level metrics - if config.shard_level_metrics: - shard_level_url = urlparse.urljoin(config.url, pshard_stats_url) + '?level=shards' - shard_level_data = self._get_data(shard_level_url, config) - self._process_shard_level_data(shard_level_data, config, shard_level_metrics) + health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\ + = self._define_params(version, config.cluster_stats) # Load stats data. stats_url = urlparse.urljoin(config.url, stats_url) @@ -352,9 +270,6 @@ def _define_params(self, version, cluster_stats): """ Define the set of URLs and METRICS to use depending on the running ES version. """ - - pshard_stats_url = "/_stats" - if version >= [0, 90, 10]: # ES versions 0.90.10 and above health_url = "/_cluster/health?pretty=true" @@ -384,21 +299,14 @@ def _define_params(self, version, cluster_stats): if version >= [0, 90, 5]: # ES versions 0.90.5 and above - stats_metrics.update(self.ADDITIONAL_METRICS_POST_0_90_5) + additional_metrics = self.ADDITIONAL_METRICS_POST_0_90_5 else: # ES version 0.90.4 and below - stats_metrics.update(self.ADDITIONAL_METRICS_PRE_0_90_5) - - # Version specific stats metrics about the primary shards - pshard_stats_metrics = dict(self.PRIMARY_SHARD_METRICS) - shard_level_metrics = dict(self.SHARD_LEVEL_METRICS) + additional_metrics = self.ADDITIONAL_METRICS_PRE_0_90_5 - if version >= [1, 0, 0]: - pshard_stats_metrics.update(self.PRIMARY_SHARD_METRICS_POST_1_0) - shard_level_metrics.update(self.SHARD_LEVEL_METRICS_POST_1_0) + stats_metrics.update(additional_metrics) - return health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, \ - stats_metrics, pshard_stats_metrics, shard_level_metrics + return health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics def _get_data(self, url, config, send_sc=True): """ Hit a given URL and return the parsed json @@ -462,66 +370,6 @@ def _process_stats_data(self, nodes_url, data, stats_metrics, config): node_data, metric, *desc, tags=config.tags, hostname=metric_hostname) - def _process_pshard_stats_data(self, data, config, pshard_stats_metrics): - for metric, desc in pshard_stats_metrics.iteritems(): - self._process_metric(data, metric, *desc, tags=config.tags) - - def _process_shard_level_data(self, data, config, shard_level_metrics): - for index_name, index_data in data['indices'].iteritems(): - for i_str, pshard_and_replicas in index_data['shards'].iteritems(): - # Do we have more than one replica for this shard ? - count_replicas = len(pshard_and_replicas) > 2 - replica_number = 0 - - for shard in pshard_and_replicas: - # Let's get the host tag - node = shard['routing']['node'] - - # Let's compute our shard name - shard_role = "replica" - if shard['routing']['primary']: - shard_name = 'P' + i_str - shard_role = "primary" - elif count_replicas: - replica_number += 1 - shard_name = 'R{0}_{1}'.format(i_str, replica_number) - else: - shard_name = 'R' + i_str - - # Let's add some interesting tags that will enable us to - # slice and dice as we wish in DatadogHQ - es_shard_tags = ["es_node:{0}".format(node), - "es_shard:{0}".format(shard_name), - "es_index:{0}".format(index_name), - "es_role:{0}".format(shard_role), - "shard_specific"] - - tags = config.tags + es_shard_tags - - # Let's send a good old service check - if shard['routing']['state'] == 'STARTED': - state = AgentCheck.OK - sc_msg = "Shard is running" - elif shard['routing']['state'] == 'INITIALIZING': - state = AgentCheck.WARNING - sc_msg = ("Shard is currently being initialized. It should turn green " - "(or red...) soon") - else: - state = AgentCheck.CRITICAL - sc_msg = "Shard is unassigned. Do you have enough nodes in your cluster ? " - - self.service_check( - self.SERVICE_CHECK_SHARD_STATE, - state, - message=sc_msg, - tags=config.service_check_tags + es_shard_tags - ) - - # And let's finally send our metric ! - for metric, desc in shard_level_metrics.iteritems(): - self._process_metric(shard, metric, *desc, tags=config.tags + - es_shard_tags) - def _process_metric(self, data, metric, xtype, path, xform=None, tags=None, hostname=None): """data: dictionary containing all the stats diff --git a/conf.d/elastic.yaml.example b/conf.d/elastic.yaml.example index 6cf426f660..f77ba2fdd2 100644 --- a/conf.d/elastic.yaml.example +++ b/conf.d/elastic.yaml.example @@ -15,35 +15,10 @@ instances: # This parameter was also called `is_external` and you can still use it but it # will be removed in version 6. # - # If you enable the "pshard_stats" flag, statistics over primary shards - # will be collected by the check and sent to the backend with the - # 'elasticsearch.primary' prefix. It is particularly useful if you want to - # get certain metrics without taking replicas into account. For instance, - # 'elasticsearch.primaries.docs.count` will give you the total number of - # documents in your indexes WITHOUT counting duplicates due to the existence - # of replica shards in your ES cluster - # - # The "shard_level_metrics" flag enables metrics and service checks on a per- - # shard basis (all the information is fetched under the /_stats?level=shards - # endpoint). The metrics and service check sent for each shard are named as - # such: elasticsearch.shard.metric.name . - # The shard name is computed according to elasticsearch's documentation. Each - # primary shard is named Pi (ex: P0, P1, P2...) and every replica shards will - # be named Ri (R0, R1, R2). In case the number of replicas is superior to 1, - # we stick to the following convention for shard names : Rx_y where x is the - # number of the associated primary shard while y is the replica number. - # - # Plase note that shard-level metrics will get the following extra tags: - # es_node:, es_shard:, es_index: and - # es_role:(primary|replica). They will also carry a "shard_specific" tag. It - # should enable you to slice and dice as you please in your DatadogHQ. - # - url: http://localhost:9200 # username: username # password: password - # cluster_stats: false - # pshard_stats: false - # shard_level_metrics: true + # is_external: false # tags: # - 'tag1:key1' # - 'tag2:key2' diff --git a/tests/checks/integration/test_elastic.py b/tests/checks/integration/test_elastic.py index 6b05ae7e65..b0a7869968 100644 --- a/tests/checks/integration/test_elastic.py +++ b/tests/checks/integration/test_elastic.py @@ -10,54 +10,6 @@ from config import get_version from tests.checks.common import AgentCheckTest, load_check -# Primary shard level metrics, these dicts are populated on the go when needed -PRIMARY_SHARD_METRICS = {} -PRIMARY_SHARD_METRICS_POST_1_0 = {} - -# Shard level metrics (also populated on the fly) -SHARD_LEVEL_METRICS = {} -SHARD_LEVEL_METRICS_POST_1_0 = {} - -# Shard-specific metrics -SHARD_LEVEL_METRICS_SUFFIX = { - ".docs.count": ("gauge", "docs.count"), - ".docs.deleted": ("gauge", "docs.deleted"), - ".store.size": ("gauge", "store.size_in_bytes"), - ".indexing.index.total": ("gauge", "indexing.index_total"), - ".indexing.index.time": ("gauge", "indexing.index_time_in_millis", lambda v: float(v)/1000), - ".indexing.index.current": ("gauge", "indexing.index_current"), - ".indexing.delete.total": ("gauge", "indexing.delete_total"), - ".indexing.delete.time": ("gauge", "indexing.delete_time_in_millis", lambda v: float(v)/1000), - ".indexing.delete.current": ("gauge", "indexing.delete_current"), - ".get.total": ("gauge", "get.total"), - ".get.time": ("gauge", "get.time_in_millis", lambda v: float(v)/1000), - ".get.current": ("gauge", "get.current"), - ".get.exists.total": ("gauge", "get.exists_total"), - ".get.exists.time": ("gauge", "get.exists_time_in_millis", lambda v: float(v)/1000), - ".get.missing.total": ("gauge", "get.missing_total"), - ".get.missing.time": ("gauge", "get.missing_time_in_millis", lambda v: float(v)/1000), - ".search.query.total": ("gauge", "search.query_total"), - ".search.query.time": ("gauge", "search.query_time_in_millis", lambda v: float(v)/1000), - ".search.query.current": ("gauge", "search.query_current"), - ".search.fetch.total": ("gauge", "search.fetch_total"), - ".search.fetch.time": ("gauge", "search.fetch_time_in_millis", lambda v: float(v)/1000), - ".search.fetch.current": ("gauge", "search.fetch_current") -} - -SHARD_LEVEL_METRICS_POST_1_0_SUFFIX = { - ".merges.current": ("gauge", "merges.current"), - ".merges.current.docs": ("gauge", "merges.current_docs"), - ".merges.current.size": ("gauge", "merges.current_size_in_bytes"), - ".merges.total": ("gauge", "merges.total"), - ".merges.total.time": ("gauge", "merges.total_time_in_millis", lambda v: float(v)/1000), - ".merges.total.docs": ("gauge", "merges.total_docs"), - ".merges.total.size": ("gauge", "merges.total_size_in_bytes"), - ".refresh.total": ("gauge", "refresh.total"), - ".refresh.total.time": ("gauge", "refresh.total_time_in_millis", lambda v: float(v)/1000), - ".flush.total": ("gauge", "flush.total"), - ".flush.total.time": ("gauge", "flush.total_time_in_millis", lambda v: float(v)/1000) -} - STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), @@ -193,6 +145,7 @@ "elasticsearch.pending_tasks_priority_urgent": ("gauge", "pending_tasks_priority_urgent") } + def get_es_version(): version = os.environ.get("FLAVOR_VERSION") if version is None: @@ -204,47 +157,10 @@ def get_es_version(): class TestElastic(AgentCheckTest): CHECK_NAME = "elastic" - def _get_default_expected_metrics(self, config): - """ A small helper method returning the list of configuration-agnostic - metrics sent by the check. Since every coverage report needs to have - these metrics tested, it's good to be able to fetch them in one call.""" - expected_metrics = STATS_METRICS - CLUSTER_HEALTH_METRICS.update(CLUSTER_PENDING_TASKS) - expected_metrics.update(CLUSTER_HEALTH_METRICS) - - instance_config = self.check.get_instance_config(config['instances'][0]) - es_version = self.check._get_es_version(instance_config) - - if es_version >= [0, 90, 5]: - expected_metrics.update(ADDITIONAL_METRICS_POST_0_90_5) - if es_version >= [0, 90, 10]: - expected_metrics.update(JVM_METRICS_POST_0_90_10) - else: - expected_metrics.update(JVM_METRICS_PRE_0_90_10) - else: - expected_metrics.update(ADDITIONAL_METRICS_PRE_0_90_5) - expected_metrics.update(JVM_METRICS_PRE_0_90_10) - - return expected_metrics - - def assert_default_metrics(self, config): - """ Another helper function checking for the presence of the default - metrics defined above. It's a minimalistic version, doesn't include - context checks, count checks etc... That's why it's not used in - test_check which needs more advanced tests. It's very convenient for - tests whose aim is to test the presence of configuration specific - metrics.""" - for m_name, desc in self._get_default_expected_metrics(config).iteritems(): - if desc[0] == "gauge": - self.assertMetric(m_name) - self.assertServiceCheck('elasticsearch.can_connect') - self.assertServiceCheck('elasticsearch.cluster_health') - self.assertServiceMetadata(['version']) - def test_check(self): conf_hostname = "foo" port = 9200 - bad_port = 9405 + bad_port = 9205 agent_config = { "hostname": conf_hostname, "version": get_version(), "api_key": "bar" @@ -268,17 +184,31 @@ def test_check(self): default_tags = ["url:http://localhost:{0}".format(port)] - instance_config = self.check.get_instance_config(config['instances'][0]) + expected_metrics = STATS_METRICS + CLUSTER_HEALTH_METRICS.update(CLUSTER_PENDING_TASKS) + expected_metrics.update(CLUSTER_HEALTH_METRICS) + instance_config = self.check.get_instance_config(config['instances'][0]) es_version = self.check._get_es_version(instance_config) + self.assertEquals(es_version, get_es_version()) + if es_version >= [0, 90, 5]: + expected_metrics.update(ADDITIONAL_METRICS_POST_0_90_5) + if es_version >= [0, 90, 10]: + expected_metrics.update(JVM_METRICS_POST_0_90_10) + else: + expected_metrics.update(JVM_METRICS_PRE_0_90_10) + else: + expected_metrics.update(ADDITIONAL_METRICS_PRE_0_90_5) + expected_metrics.update(JVM_METRICS_PRE_0_90_10) + contexts = [ (conf_hostname, default_tags + tags), (socket.gethostname(), default_tags) ] - for m_name, desc in self._get_default_expected_metrics(config).iteritems(): + for m_name, desc in expected_metrics.iteritems(): for hostname, m_tags in contexts: if (m_name in CLUSTER_HEALTH_METRICS and hostname == socket.gethostname()): @@ -377,8 +307,6 @@ def test_health_event(self): # Set number of replicas to 0 for all indices requests.put('http://localhost:9200/_settings', data='{"index": {"number_of_replicas": 0}}') - # Let's wait a bit - requests.post('http://localhost:9200/_flush?wait_for_ongoing') # Now shards should be green self.run_check(config) @@ -393,96 +321,3 @@ def test_health_event(self): tags=['host:localhost', 'port:9200'], count=1 ) - - def test_pshard_metrics(self): - """ Tests that the pshard related metrics are forwarded """ - - config = {'instances': [ - {'url': 'http://localhost:9200', 'pshard_stats': True, 'hostname': 'foo'} - ]} - # Cleaning up everything won't hurt. - requests.delete('http://localhost:9200/_all/') - - requests.put('http://localhost:9200/testindex/', - data='{"settings": {"index": {"number_of_replicas": 1,' - '"number_of_shards":5}}}') - requests.put('http://localhost:9200/testindex/testtype/2', - data='{"name": "Jane Doe", "age": 27}') - requests.put('http://localhost:9200/testindex/testtype/1', - data='{"name": "John Doe", "age": 42}') - - # Let's wait for elasticsearch to process what we sent him - requests.post('http://localhost:9200/_flush?wait_for_ongoing') - - self.run_check(config) - - # Let's populate the dicts of expected metrics - for k, v in SHARD_LEVEL_METRICS_SUFFIX.iteritems(): - val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None) - PRIMARY_SHARD_METRICS['elasticsearch.primaries{0}'.format(k)] = val - - for k, v in SHARD_LEVEL_METRICS_POST_1_0_SUFFIX.iteritems(): - val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None) - PRIMARY_SHARD_METRICS_POST_1_0['elasticsearch.primaries{0}'.format(k)] = val - - pshard_stats_metrics = dict(PRIMARY_SHARD_METRICS) - if get_es_version() >= [1, 0, 0]: - pshard_stats_metrics.update(PRIMARY_SHARD_METRICS_POST_1_0) - - for m_name, desc in pshard_stats_metrics.iteritems(): - if desc[0] == "gauge": - self.assertMetric(m_name, count=1) - - # Our pshard metrics are getting sent, let's check that they're accurate - # Note: please make sure you don't install Maven on the CI for future - # elastic search CI integrations. It would make the line below fail :/ - self.assertMetric('elasticsearch.primaries.docs.count', value=2) - - # And let's add the configuration-agnostic metrics to our coverage report - self.assert_default_metrics(config) - # This event will be sent because replication doesn't work on one node :) - self.assertEvent('yellow', exact_match=False) - self.coverage_report() - - def test_shard_level_metrics_and_service_checks(self): - """ Tests that when the option is set to true, the shard-level metrics - are sent as well as the service checks """ - config = {'instances': [ - {'url': 'http://localhost:9200', 'shard_level_metrics': True, 'hostname': 'foo'} - ]} - - requests.delete('http://localhost:9200/_all/') - - requests.put('http://localhost:9200/testindex/', - data='{"settings": {"index": {"number_of_replicas": 0,' - '"number_of_shards":3}}}') - requests.put('http://localhost:9200/testindex/testtype/2', - data='{"name": "Jane Doe", "age": 27}') - - requests.post('http://localhost:9200/_flush?wait_for_ongoing') - - self.run_check(config) - - for k, v in SHARD_LEVEL_METRICS_SUFFIX.iteritems(): - SHARD_LEVEL_METRICS['elasticsearch.shard{0}'.format(k)] = v - - for k, v in SHARD_LEVEL_METRICS_POST_1_0_SUFFIX.iteritems(): - SHARD_LEVEL_METRICS_POST_1_0['elasticsearch.shard{0}'.format(k)] = v - - shard_level_metrics = dict(SHARD_LEVEL_METRICS) - if get_es_version() >= [1, 0, 0]: - shard_level_metrics.update(SHARD_LEVEL_METRICS_POST_1_0) - - # Check for all the metrics - for m_name, desc in shard_level_metrics.iteritems(): - if desc[0] == "gauge": - self.assertMetric(m_name, count=3) - - # And for service checks as well - self.assertServiceCheckOK( - 'elasticsearch.shard.state', - count=3 - ) - - self.assert_default_metrics(config) - self.coverage_report() From 538faadebccc6ae1fbab5dd7c1b397594a88fb1a Mon Sep 17 00:00:00 2001 From: Etienne LAFARGE Date: Thu, 27 Aug 2015 14:59:16 -0400 Subject: [PATCH 2/3] [Changelog] Removed Elasticsearch changes The new JVM metrics and PShard level metrics will be readded to that changelog once the latter will have been cherry-picked. --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c646e390d7..938a75fa76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,7 +87,6 @@ See [#1758][] ### Changes * [FEATURE] Consul: New check reporting cluster, service and node wide metrics and events for leader election. See [#1628][] * [FEATURE] CouchDB: Allow blacklisting of specific databases. See [#1760][] -* [FEATURE] Elasticsearch: Support shard level metrics. See [#1752][] * [FEATURE] etcd: SSL support. See [#1745][] (Thanks [@KnownSubset][]) * [FEATURE] Flare: Add JMXFetch-specific information. See [#1726][] * [FEATURE] Flare: Log permissions on collected files. See [#1767][] @@ -2043,4 +2042,4 @@ If you use ganglia, you want this version. [@ulich]: https://github.com/ulich [@walkeran]: https://github.com/walkeran [@warnerpr-cyan]: https://github.com/warnerpr-cyan -[@yyamano]: https://github.com/yyamano \ No newline at end of file +[@yyamano]: https://github.com/yyamano From 25dfab03f14e64f313bab71bae01ed586e7e37d7 Mon Sep 17 00:00:00 2001 From: Etienne LAFARGE Date: Tue, 23 Jun 2015 17:05:44 -0400 Subject: [PATCH 3/3] [elasticsearch] Add PShard statistics to ES check Added statistics over primary shards only to our check. They're basically retrieved under the `_stats` endpoint and are aggregated metrics on the primary shards. It means that they don't take into account metrics from replica shards. So for instance `primaries.docs.count` will contain the total number of documents in the cluster without replicas. Computing replica's would be as simple as summing the "standard" document count metric over all nodes in the cluster. A test for those new metrics in particular has been added. --- checks.d/elastic.py | 72 ++++++++++++++++++++- conf.d/elastic.yaml.example | 11 +++- tests/checks/integration/test_elastic.py | 81 +++++++++++++++++++++++- 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/checks.d/elastic.py b/checks.d/elastic.py index 881b7472bf..c2fda307ec 100644 --- a/checks.d/elastic.py +++ b/checks.d/elastic.py @@ -18,6 +18,7 @@ class NodeNotFound(Exception): ESInstanceConfig = namedtuple( 'ESInstanceConfig', [ + 'pshard_stats', 'cluster_stats', 'password', 'service_check_tags', @@ -34,6 +35,46 @@ class ESCheck(AgentCheck): DEFAULT_TIMEOUT = 5 + # Clusterwise metrics, pre aggregated on ES, compatible with all ES versions + PRIMARY_SHARD_METRICS = { + "elasticsearch.primaries.docs.count": ("gauge", "_all.primaries.docs.count"), + "elasticsearch.primaries.docs.deleted": ("gauge", "_all.primaries.docs.deleted"), + "elasticsearch.primaries.store.size": ("gauge", "_all.primaries.store.size_in_bytes"), + "elasticsearch.primaries.indexing.index.total": ("gauge", "_all.primaries.indexing.index_total"), + "elasticsearch.primaries.indexing.index.time": ("gauge", "_all.primaries.indexing.index_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.indexing.index.current": ("gauge", "_all.primaries.indexing.index_current"), + "elasticsearch.primaries.indexing.delete.total": ("gauge", "_all.primaries.indexing.delete_total"), + "elasticsearch.primaries.indexing.delete.time": ("gauge", "_all.primaries.indexing.delete_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.indexing.delete.current": ("gauge", "_all.primaries.indexing.delete_current"), + "elasticsearch.primaries.get.total": ("gauge", "_all.primaries.get.total"), + "elasticsearch.primaries.get.time": ("gauge", "_all.primaries.get.time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.get.current": ("gauge", "_all.primaries.get.current"), + "elasticsearch.primaries.get.exists.total": ("gauge", "_all.primaries.get.exists_total"), + "elasticsearch.primaries.get.exists.time": ("gauge", "_all.primaries.get.exists_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.get.missing.total": ("gauge", "_all.primaries.get.missing_total"), + "elasticsearch.primaries.get.missing.time": ("gauge", "_all.primaries.get.missing_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.query.total": ("gauge", "_all.primaries.search.query_total"), + "elasticsearch.primaries.search.query.time": ("gauge", "_all.primaries.search.query_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.query.current": ("gauge", "_all.primaries.search.query_current"), + "elasticsearch.primaries.search.fetch.total": ("gauge", "_all.primaries.search.fetch_total"), + "elasticsearch.primaries.search.fetch.time": ("gauge", "_all.primaries.search.fetch_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.fetch.current": ("gauge", "_all.primaries.search.fetch_current") + } + + PRIMARY_SHARD_METRICS_POST_1_0 = { + "elasticsearch.primaries.merges.current": ("gauge", "_all.primaries.merges.current"), + "elasticsearch.primaries.merges.current.docs": ("gauge", "_all.primaries.merges.current_docs"), + "elasticsearch.primaries.merges.current.size": ("gauge", "_all.primaries.merges.current_size_in_bytes"), + "elasticsearch.primaries.merges.total": ("gauge", "_all.primaries.merges.total"), + "elasticsearch.primaries.merges.total.time": ("gauge", "_all.primaries.merges.total_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.merges.total.docs": ("gauge", "_all.primaries.merges.total_docs"), + "elasticsearch.primaries.merges.total.size": ("gauge", "_all.primaries.merges.total_size_in_bytes"), + "elasticsearch.primaries.refresh.total": ("gauge", "_all.primaries.refresh.total"), + "elasticsearch.primaries.refresh.total.time": ("gauge", "_all.primaries.refresh.total_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.flush.total": ("gauge", "_all.primaries.flush.total"), + "elasticsearch.primaries.flush.total.time": ("gauge", "_all.primaries.flush.total_time_in_millis", lambda v: float(v)/1000) + } + STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), @@ -182,6 +223,8 @@ def get_instance_config(self, instance): if url is None: raise Exception("An url must be specified in the instance") + pshard_stats = _is_affirmative(instance.get('pshard_stats', False)) + cluster_stats = _is_affirmative(instance.get('cluster_stats', False)) if 'is_external' in instance: cluster_stats = _is_affirmative(instance.get('is_external', False)) @@ -206,6 +249,7 @@ def get_instance_config(self, instance): timeout = instance.get('timeout') or self.DEFAULT_TIMEOUT config = ESInstanceConfig( + pshard_stats=pshard_stats, cluster_stats=cluster_stats, password=instance.get('password'), service_check_tags=service_check_tags, @@ -223,8 +267,14 @@ def check(self, instance): # (URLs and metrics) accordingly version = self._get_es_version(config) - health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\ - = self._define_params(version, config.cluster_stats) + health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, stats_metrics, \ + pshard_stats_metrics = self._define_params(version, config.cluster_stats) + + # Load clusterwise data + if config.pshard_stats: + pshard_stats_url = urlparse.urljoin(config.url, pshard_stats_url) + pshard_stats_data = self._get_data(pshard_stats_url, config) + self._process_pshard_stats_data(pshard_stats_data, config, pshard_stats_metrics) # Load stats data. stats_url = urlparse.urljoin(config.url, stats_url) @@ -270,6 +320,9 @@ def _define_params(self, version, cluster_stats): """ Define the set of URLs and METRICS to use depending on the running ES version. """ + + pshard_stats_url = "/_stats" + if version >= [0, 90, 10]: # ES versions 0.90.10 and above health_url = "/_cluster/health?pretty=true" @@ -306,7 +359,16 @@ def _define_params(self, version, cluster_stats): stats_metrics.update(additional_metrics) - return health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics + # Version specific stats metrics about the primary shards + pshard_stats_metrics = dict(self.PRIMARY_SHARD_METRICS) + + if version >= [1, 0, 0]: + additional_metrics = self.PRIMARY_SHARD_METRICS_POST_1_0 + + pshard_stats_metrics.update(additional_metrics) + + return health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, \ + stats_metrics, pshard_stats_metrics def _get_data(self, url, config, send_sc=True): """ Hit a given URL and return the parsed json @@ -370,6 +432,10 @@ def _process_stats_data(self, nodes_url, data, stats_metrics, config): node_data, metric, *desc, tags=config.tags, hostname=metric_hostname) + def _process_pshard_stats_data(self, data, config, pshard_stats_metrics): + for metric, desc in pshard_stats_metrics.iteritems(): + self._process_metric(data, metric, *desc, tags=config.tags) + def _process_metric(self, data, metric, xtype, path, xform=None, tags=None, hostname=None): """data: dictionary containing all the stats diff --git a/conf.d/elastic.yaml.example b/conf.d/elastic.yaml.example index f77ba2fdd2..9ba6af218b 100644 --- a/conf.d/elastic.yaml.example +++ b/conf.d/elastic.yaml.example @@ -15,10 +15,19 @@ instances: # This parameter was also called `is_external` and you can still use it but it # will be removed in version 6. # + # If you enable the "pshard_stats" flag, statistics over primary shards + # will be collected by the check and sent to the backend with the + # 'elasticsearch.primary' prefix. It is particularly useful if you want to + # get certain metrics without taking replicas into account. For instance, + # 'elasticsearch.primaries.docs.count` will give you the total number of + # documents in your indexes WITHOUT counting duplicates due to the existence + # of replica shards in your ES cluster + # - url: http://localhost:9200 # username: username # password: password - # is_external: false + # cluster_stats: false + # pshard_stats: false # tags: # - 'tag1:key1' # - 'tag2:key2' diff --git a/tests/checks/integration/test_elastic.py b/tests/checks/integration/test_elastic.py index b0a7869968..fe419249aa 100644 --- a/tests/checks/integration/test_elastic.py +++ b/tests/checks/integration/test_elastic.py @@ -1,5 +1,6 @@ # stdlib import os +import time import socket # 3p @@ -10,6 +11,46 @@ from config import get_version from tests.checks.common import AgentCheckTest, load_check +# Clusterwise metrics, pre aggregated on ES, compatible with all ES versions +PRIMARY_SHARD_METRICS = { + "elasticsearch.primaries.docs.count": ("gauge", "_all.primaries.docs.count"), + "elasticsearch.primaries.docs.deleted": ("gauge", "_all.primaries.docs.deleted"), + "elasticsearch.primaries.store.size": ("gauge", "_all.primaries.store.size_in_bytes"), + "elasticsearch.primaries.indexing.index.total": ("gauge", "_all.primaries.indexing.index_total"), + "elasticsearch.primaries.indexing.index.time": ("gauge", "_all.primaries.indexing.index_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.indexing.index.current": ("gauge", "_all.primaries.indexing.index_current"), + "elasticsearch.primaries.indexing.delete.total": ("gauge", "_all.primaries.indexing.delete_total"), + "elasticsearch.primaries.indexing.delete.time": ("gauge", "_all.primaries.indexing.delete_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.indexing.delete.current": ("gauge", "_all.primaries.indexing.delete_current"), + "elasticsearch.primaries.get.total": ("gauge", "_all.primaries.get.total"), + "elasticsearch.primaries.get.time": ("gauge", "_all.primaries.get.time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.get.current": ("gauge", "_all.primaries.get.current"), + "elasticsearch.primaries.get.exists.total": ("gauge", "_all.primaries.get.exists_total"), + "elasticsearch.primaries.get.exists.time": ("gauge", "_all.primaries.get.exists_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.get.missing.total": ("gauge", "_all.primaries.get.missing_total"), + "elasticsearch.primaries.get.missing.time": ("gauge", "_all.primaries.get.missing_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.query.total": ("gauge", "_all.primaries.search.query_total"), + "elasticsearch.primaries.search.query.time": ("gauge", "_all.primaries.search.query_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.query.current": ("gauge", "_all.primaries.search.query_current"), + "elasticsearch.primaries.search.fetch.total": ("gauge", "_all.primaries.search.fetch_total"), + "elasticsearch.primaries.search.fetch.time": ("gauge", "_all.primaries.search.fetch_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.search.fetch.current": ("gauge", "_all.primaries.search.fetch_current") +} + +PRIMARY_SHARD_METRICS_POST_1_0 = { + "elasticsearch.primaries.merges.current": ("gauge", "_all.primaries.merges.current"), + "elasticsearch.primaries.merges.current.docs": ("gauge", "_all.primaries.merges.current_docs"), + "elasticsearch.primaries.merges.current.size": ("gauge", "_all.primaries.merges.current_size_in_bytes"), + "elasticsearch.primaries.merges.total": ("gauge", "_all.primaries.merges.total"), + "elasticsearch.primaries.merges.total.time": ("gauge", "_all.primaries.merges.total_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.merges.total.docs": ("gauge", "_all.primaries.merges.total_docs"), + "elasticsearch.primaries.merges.total.size": ("gauge", "_all.primaries.merges.total_size_in_bytes"), + "elasticsearch.primaries.refresh.total": ("gauge", "_all.primaries.refresh.total"), + "elasticsearch.primaries.refresh.total.time": ("gauge", "_all.primaries.refresh.total_time_in_millis", lambda v: float(v)/1000), + "elasticsearch.primaries.flush.total": ("gauge", "_all.primaries.flush.total"), + "elasticsearch.primaries.flush.total.time": ("gauge", "_all.primaries.flush.total_time_in_millis", lambda v: float(v)/1000) +} + STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), @@ -160,7 +201,7 @@ class TestElastic(AgentCheckTest): def test_check(self): conf_hostname = "foo" port = 9200 - bad_port = 9205 + bad_port = 9405 agent_config = { "hostname": conf_hostname, "version": get_version(), "api_key": "bar" @@ -307,6 +348,7 @@ def test_health_event(self): # Set number of replicas to 0 for all indices requests.put('http://localhost:9200/_settings', data='{"index": {"number_of_replicas": 0}}') + time.sleep(5) # Now shards should be green self.run_check(config) @@ -321,3 +363,40 @@ def test_health_event(self): tags=['host:localhost', 'port:9200'], count=1 ) + + def test_pshard_metrics(self): + """ Tests that the pshard related metrics are forwarded and that the + document count for primary indexes is twice smaller as the global + document count when "number_of_replicas" is set to 1 """ + elastic_latency = 10 + + config = {'instances': [ + {'url': 'http://localhost:9200', 'pshard_stats': True} + ]} + # Cleaning up everything won't hurt. + req = requests.get('http://localhost:9200/_cat/indices?v') + indices_info = req.text.split('\n')[1::-1] + for index_info in indices_info: + index_name = index_info.split()[1] + requests.delete('http://localhost:9200/' + index_name) + + requests.put('http://localhost:9200/_settings', data='{"index": {"number_of_replicas": 1}}') + requests.put('http://localhost:9200/testindex/testtype/2', data='{"name": "Jane Doe", "age": 27}') + requests.put('http://localhost:9200/testindex/testtype/1', data='{"name": "John Doe", "age": 42}') + + time.sleep(elastic_latency) + + self.run_check(config) + + pshard_stats_metrics = dict(PRIMARY_SHARD_METRICS) + if get_es_version() >= [1, 0, 0]: + pshard_stats_metrics.update(PRIMARY_SHARD_METRICS_POST_1_0) + + for m_name, desc in pshard_stats_metrics.iteritems(): + if desc[0] == "gauge": + self.assertMetric(m_name, count=1) + + # Our pshard metrics are getting sent, let's check that they're accurate + # Note: please make sure you don't install Maven on the CI for future + # elastic search CI integrations. It would make the line below fail :/ + self.assertMetric('elasticsearch.primaries.docs.count', value=2)