Skip to content

Commit

Permalink
Merge pull request #1875 from DataDog/etienne/revert-es-shard-level-s…
Browse files Browse the repository at this point in the history
…tats

Etienne/revert es shard level stats
  • Loading branch information
yannmh committed Aug 27, 2015
2 parents 08a4fba + 25dfab0 commit 7b9532c
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 305 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ See [#1758][]
### Changes
* [FEATURE] Consul: New check reporting cluster, service and node wide metrics and events for leader election. See [#1628][]
* [FEATURE] CouchDB: Allow blacklisting of specific databases. See [#1760][]
* [FEATURE] Elasticsearch: Support shard level metrics. See [#1752][]
* [FEATURE] etcd: SSL support. See [#1745][] (Thanks [@KnownSubset][])
* [FEATURE] Flare: Add JMXFetch-specific information. See [#1726][]
* [FEATURE] Flare: Log permissions on collected files. See [#1767][]
Expand Down Expand Up @@ -2043,4 +2042,4 @@ If you use ganglia, you want this version.
[@ulich]: https://github.com/ulich
[@walkeran]: https://github.com/walkeran
[@warnerpr-cyan]: https://github.com/warnerpr-cyan
[@yyamano]: https://github.com/yyamano
[@yyamano]: https://github.com/yyamano
178 changes: 46 additions & 132 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class NodeNotFound(Exception):

ESInstanceConfig = namedtuple(
'ESInstanceConfig', [
'shard_level_metrics',
'pshard_stats',
'cluster_stats',
'password',
Expand All @@ -33,57 +32,47 @@ class NodeNotFound(Exception):
class ESCheck(AgentCheck):
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
SERVICE_CHECK_SHARD_STATE = 'elasticsearch.shard.state'

DEFAULT_TIMEOUT = 5

# Pre aggregated metrics concerning only primary shards
# Those dicts are populated in the ESCheck constructor
PRIMARY_SHARD_METRICS = {}
PRIMARY_SHARD_METRICS_POST_1_0 = {}

# Same kind of dicts for SHARD_LEVEL_METRICS
SHARD_LEVEL_METRICS = {}
SHARD_LEVEL_METRICS_POST_1_0 = {}

# Shard-specific metrics
SHARD_LEVEL_METRICS_SUFFIX = {
".docs.count": ("gauge", "docs.count"),
".docs.deleted": ("gauge", "docs.deleted"),
".store.size": ("gauge", "store.size_in_bytes"),
".indexing.index.total": ("gauge", "indexing.index_total"),
".indexing.index.time": ("gauge", "indexing.index_time_in_millis", lambda v: float(v)/1000),
".indexing.index.current": ("gauge", "indexing.index_current"),
".indexing.delete.total": ("gauge", "indexing.delete_total"),
".indexing.delete.time": ("gauge", "indexing.delete_time_in_millis", lambda v: float(v)/1000),
".indexing.delete.current": ("gauge", "indexing.delete_current"),
".get.total": ("gauge", "get.total"),
".get.time": ("gauge", "get.time_in_millis", lambda v: float(v)/1000),
".get.current": ("gauge", "get.current"),
".get.exists.total": ("gauge", "get.exists_total"),
".get.exists.time": ("gauge", "get.exists_time_in_millis", lambda v: float(v)/1000),
".get.missing.total": ("gauge", "get.missing_total"),
".get.missing.time": ("gauge", "get.missing_time_in_millis", lambda v: float(v)/1000),
".search.query.total": ("gauge", "search.query_total"),
".search.query.time": ("gauge", "search.query_time_in_millis", lambda v: float(v)/1000),
".search.query.current": ("gauge", "search.query_current"),
".search.fetch.total": ("gauge", "search.fetch_total"),
".search.fetch.time": ("gauge", "search.fetch_time_in_millis", lambda v: float(v)/1000),
".search.fetch.current": ("gauge", "search.fetch_current")
# Clusterwise metrics, pre aggregated on ES, compatible with all ES versions
PRIMARY_SHARD_METRICS = {
"elasticsearch.primaries.docs.count": ("gauge", "_all.primaries.docs.count"),
"elasticsearch.primaries.docs.deleted": ("gauge", "_all.primaries.docs.deleted"),
"elasticsearch.primaries.store.size": ("gauge", "_all.primaries.store.size_in_bytes"),
"elasticsearch.primaries.indexing.index.total": ("gauge", "_all.primaries.indexing.index_total"),
"elasticsearch.primaries.indexing.index.time": ("gauge", "_all.primaries.indexing.index_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.indexing.index.current": ("gauge", "_all.primaries.indexing.index_current"),
"elasticsearch.primaries.indexing.delete.total": ("gauge", "_all.primaries.indexing.delete_total"),
"elasticsearch.primaries.indexing.delete.time": ("gauge", "_all.primaries.indexing.delete_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.indexing.delete.current": ("gauge", "_all.primaries.indexing.delete_current"),
"elasticsearch.primaries.get.total": ("gauge", "_all.primaries.get.total"),
"elasticsearch.primaries.get.time": ("gauge", "_all.primaries.get.time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.get.current": ("gauge", "_all.primaries.get.current"),
"elasticsearch.primaries.get.exists.total": ("gauge", "_all.primaries.get.exists_total"),
"elasticsearch.primaries.get.exists.time": ("gauge", "_all.primaries.get.exists_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.get.missing.total": ("gauge", "_all.primaries.get.missing_total"),
"elasticsearch.primaries.get.missing.time": ("gauge", "_all.primaries.get.missing_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.search.query.total": ("gauge", "_all.primaries.search.query_total"),
"elasticsearch.primaries.search.query.time": ("gauge", "_all.primaries.search.query_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.search.query.current": ("gauge", "_all.primaries.search.query_current"),
"elasticsearch.primaries.search.fetch.total": ("gauge", "_all.primaries.search.fetch_total"),
"elasticsearch.primaries.search.fetch.time": ("gauge", "_all.primaries.search.fetch_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.search.fetch.current": ("gauge", "_all.primaries.search.fetch_current")
}

SHARD_LEVEL_METRICS_POST_1_0_SUFFIX = {
".merges.current": ("gauge", "merges.current"),
".merges.current.docs": ("gauge", "merges.current_docs"),
".merges.current.size": ("gauge", "merges.current_size_in_bytes"),
".merges.total": ("gauge", "merges.total"),
".merges.total.time": ("gauge", "merges.total_time_in_millis", lambda v: float(v)/1000),
".merges.total.docs": ("gauge", "merges.total_docs"),
".merges.total.size": ("gauge", "merges.total_size_in_bytes"),
".refresh.total": ("gauge", "refresh.total"),
".refresh.total.time": ("gauge", "refresh.total_time_in_millis", lambda v: float(v)/1000),
".flush.total": ("gauge", "flush.total"),
".flush.total.time": ("gauge", "flush.total_time_in_millis", lambda v: float(v)/1000)
PRIMARY_SHARD_METRICS_POST_1_0 = {
"elasticsearch.primaries.merges.current": ("gauge", "_all.primaries.merges.current"),
"elasticsearch.primaries.merges.current.docs": ("gauge", "_all.primaries.merges.current_docs"),
"elasticsearch.primaries.merges.current.size": ("gauge", "_all.primaries.merges.current_size_in_bytes"),
"elasticsearch.primaries.merges.total": ("gauge", "_all.primaries.merges.total"),
"elasticsearch.primaries.merges.total.time": ("gauge", "_all.primaries.merges.total_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.merges.total.docs": ("gauge", "_all.primaries.merges.total_docs"),
"elasticsearch.primaries.merges.total.size": ("gauge", "_all.primaries.merges.total_size_in_bytes"),
"elasticsearch.primaries.refresh.total": ("gauge", "_all.primaries.refresh.total"),
"elasticsearch.primaries.refresh.total.time": ("gauge", "_all.primaries.refresh.total_time_in_millis", lambda v: float(v)/1000),
"elasticsearch.primaries.flush.total": ("gauge", "_all.primaries.flush.total"),
"elasticsearch.primaries.flush.total.time": ("gauge", "_all.primaries.flush.total_time_in_millis", lambda v: float(v)/1000)
}

STATS_METRICS = { # Metrics that are common to all Elasticsearch versions
Expand Down Expand Up @@ -224,18 +213,6 @@ class ESCheck(AgentCheck):
SOURCE_TYPE_NAME = 'elasticsearch'

def __init__(self, name, init_config, agentConfig, instances=None):

# Let's construct the PRIMARY_SHARD_METRICS and SHARD_LEVEL_METRICS dicts
for k, v in self.SHARD_LEVEL_METRICS_SUFFIX.iteritems():
val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None)
self.PRIMARY_SHARD_METRICS['elasticsearch.primaries{0}'.format(k)] = val
self.SHARD_LEVEL_METRICS['elasticsearch.shard{0}'.format(k)] = v

for k, v in self.SHARD_LEVEL_METRICS_POST_1_0_SUFFIX.iteritems():
val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None)
self.PRIMARY_SHARD_METRICS_POST_1_0['elasticsearch.primaries{0}'.format(k)] = val
self.SHARD_LEVEL_METRICS_POST_1_0['elasticsearch.shard{0}'.format(k)] = v

AgentCheck.__init__(self, name, init_config, agentConfig, instances)

# Host status needs to persist across all checks
Expand All @@ -247,7 +224,6 @@ def get_instance_config(self, instance):
raise Exception("An url must be specified in the instance")

pshard_stats = _is_affirmative(instance.get('pshard_stats', False))
shard_level_metrics = _is_affirmative(instance.get('shard_level_metrics', False))

cluster_stats = _is_affirmative(instance.get('cluster_stats', False))
if 'is_external' in instance:
Expand All @@ -273,7 +249,6 @@ def get_instance_config(self, instance):
timeout = instance.get('timeout') or self.DEFAULT_TIMEOUT

config = ESInstanceConfig(
shard_level_metrics=shard_level_metrics,
pshard_stats=pshard_stats,
cluster_stats=cluster_stats,
password=instance.get('password'),
Expand All @@ -293,21 +268,14 @@ def check(self, instance):
version = self._get_es_version(config)

health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, stats_metrics, \
pshard_stats_metrics, shard_level_metrics = self._define_params(version,
config.cluster_stats)
pshard_stats_metrics = self._define_params(version, config.cluster_stats)

# Load primary shards data
# Load clusterwise data
if config.pshard_stats:
pshard_stats_url = urlparse.urljoin(config.url, pshard_stats_url)
pshard_stats_data = self._get_data(pshard_stats_url, config)
self._process_pshard_stats_data(pshard_stats_data, config, pshard_stats_metrics)

# Load shard-level metrics
if config.shard_level_metrics:
shard_level_url = urlparse.urljoin(config.url, pshard_stats_url) + '?level=shards'
shard_level_data = self._get_data(shard_level_url, config)
self._process_shard_level_data(shard_level_data, config, shard_level_metrics)

# Load stats data.
stats_url = urlparse.urljoin(config.url, stats_url)
stats_data = self._get_data(stats_url, config)
Expand Down Expand Up @@ -384,21 +352,23 @@ def _define_params(self, version, cluster_stats):

if version >= [0, 90, 5]:
# ES versions 0.90.5 and above
stats_metrics.update(self.ADDITIONAL_METRICS_POST_0_90_5)
additional_metrics = self.ADDITIONAL_METRICS_POST_0_90_5
else:
# ES version 0.90.4 and below
stats_metrics.update(self.ADDITIONAL_METRICS_PRE_0_90_5)
additional_metrics = self.ADDITIONAL_METRICS_PRE_0_90_5

stats_metrics.update(additional_metrics)

# Version specific stats metrics about the primary shards
pshard_stats_metrics = dict(self.PRIMARY_SHARD_METRICS)
shard_level_metrics = dict(self.SHARD_LEVEL_METRICS)

if version >= [1, 0, 0]:
pshard_stats_metrics.update(self.PRIMARY_SHARD_METRICS_POST_1_0)
shard_level_metrics.update(self.SHARD_LEVEL_METRICS_POST_1_0)
additional_metrics = self.PRIMARY_SHARD_METRICS_POST_1_0

pshard_stats_metrics.update(additional_metrics)

return health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, \
stats_metrics, pshard_stats_metrics, shard_level_metrics
stats_metrics, pshard_stats_metrics

def _get_data(self, url, config, send_sc=True):
""" Hit a given URL and return the parsed json
Expand Down Expand Up @@ -466,62 +436,6 @@ def _process_pshard_stats_data(self, data, config, pshard_stats_metrics):
for metric, desc in pshard_stats_metrics.iteritems():
self._process_metric(data, metric, *desc, tags=config.tags)

def _process_shard_level_data(self, data, config, shard_level_metrics):
for index_name, index_data in data['indices'].iteritems():
for i_str, pshard_and_replicas in index_data['shards'].iteritems():
# Do we have more than one replica for this shard ?
count_replicas = len(pshard_and_replicas) > 2
replica_number = 0

for shard in pshard_and_replicas:
# Let's get the host tag
node = shard['routing']['node']

# Let's compute our shard name
shard_role = "replica"
if shard['routing']['primary']:
shard_name = 'P' + i_str
shard_role = "primary"
elif count_replicas:
replica_number += 1
shard_name = 'R{0}_{1}'.format(i_str, replica_number)
else:
shard_name = 'R' + i_str

# Let's add some interesting tags that will enable us to
# slice and dice as we wish in DatadogHQ
es_shard_tags = ["es_node:{0}".format(node),
"es_shard:{0}".format(shard_name),
"es_index:{0}".format(index_name),
"es_role:{0}".format(shard_role),
"shard_specific"]

tags = config.tags + es_shard_tags

# Let's send a good old service check
if shard['routing']['state'] == 'STARTED':
state = AgentCheck.OK
sc_msg = "Shard is running"
elif shard['routing']['state'] == 'INITIALIZING':
state = AgentCheck.WARNING
sc_msg = ("Shard is currently being initialized. It should turn green "
"(or red...) soon")
else:
state = AgentCheck.CRITICAL
sc_msg = "Shard is unassigned. Do you have enough nodes in your cluster ? "

self.service_check(
self.SERVICE_CHECK_SHARD_STATE,
state,
message=sc_msg,
tags=config.service_check_tags + es_shard_tags
)

# And let's finally send our metric !
for metric, desc in shard_level_metrics.iteritems():
self._process_metric(shard, metric, *desc, tags=config.tags +
es_shard_tags)

def _process_metric(self, data, metric, xtype, path, xform=None,
tags=None, hostname=None):
"""data: dictionary containing all the stats
Expand Down
16 changes: 0 additions & 16 deletions conf.d/elastic.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,11 @@ instances:
# documents in your indexes WITHOUT counting duplicates due to the existence
# of replica shards in your ES cluster
#
# The "shard_level_metrics" flag enables metrics and service checks on a per-
# shard basis (all the information is fetched under the /_stats?level=shards
# endpoint). The metrics and service check sent for each shard are named as
# such: elasticsearch.shard.metric.name .
# The shard name is computed according to elasticsearch's documentation. Each
# primary shard is named Pi (ex: P0, P1, P2...) and every replica shards will
# be named Ri (R0, R1, R2). In case the number of replicas is superior to 1,
# we stick to the following convention for shard names : Rx_y where x is the
# number of the associated primary shard while y is the replica number.
#
# Plase note that shard-level metrics will get the following extra tags:
# es_node:<node_name>, es_shard:<shard_name>, es_index:<index_name> and
# es_role:(primary|replica). They will also carry a "shard_specific" tag. It
# should enable you to slice and dice as you please in your DatadogHQ.
#
- url: http://localhost:9200
# username: username
# password: password
# cluster_stats: false
# pshard_stats: false
# shard_level_metrics: true
# tags:
# - 'tag1:key1'
# - 'tag2:key2'
Loading

0 comments on commit 7b9532c

Please sign in to comment.