Skip to content

Commit

Permalink
Make histogram aggregates/%iles configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoCavaille committed Jan 21, 2015
1 parent 9517ec2 commit e84a996
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 43 deletions.
85 changes: 65 additions & 20 deletions aggregator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from time import time

from checks.metric_types import MetricTypes
from config import get_histogram_aggregates, get_histogram_percentiles

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,7 +40,7 @@ def flush(self, timestamp, interval):
class Gauge(Metric):
""" A metric that tracks a value at particular points in time. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = None
Expand Down Expand Up @@ -100,7 +102,7 @@ def flush(self, timestamp, interval):
class Count(Metric):
""" A metric that tracks a count. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = None
Expand Down Expand Up @@ -132,7 +134,7 @@ def flush(self, timestamp, interval):

class MonotonicCount(Metric):

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -180,7 +182,7 @@ def flush(self, timestamp, interval):
class Counter(Metric):
""" A metric that tracks a counter value. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = 0
Expand Down Expand Up @@ -209,16 +211,23 @@ def flush(self, timestamp, interval):
finally:
self.value = 0

DEFAULT_HISTOGRAM_AGGREGATES = ['max', 'median', 'avg', 'count']
DEFAULT_HISTOGRAM_PERCENTILES = [0.95]

class Histogram(Metric):
""" A metric to track the distribution of a set of values. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.count = 0
self.samples = []
self.percentiles = [0.95]
self.aggregates = extra_config['aggregates'] if\
extra_config is not None and extra_config.get('aggregates') is not None\
else DEFAULT_HISTOGRAM_AGGREGATES
self.percentiles = extra_config['percentiles'] if\
extra_config is not None and extra_config.get('percentiles') is not None\
else DEFAULT_HISTOGRAM_PERCENTILES
self.tags = tags
self.hostname = hostname
self.device_name = device_name
Expand All @@ -241,12 +250,18 @@ def flush(self, ts, interval):
med = self.samples[int(round(length/2 - 1))]
avg = sum(self.samples) / float(length)

metric_aggrs = [
aggregators = [
('min', min_, MetricTypes.GAUGE),
('max', max_, MetricTypes.GAUGE),
('median', med, MetricTypes.GAUGE),
('avg', avg, MetricTypes.GAUGE),
('count', self.count/interval, MetricTypes.RATE)
('count', self.count/interval, MetricTypes.RATE),
]

metric_aggrs = [
(agg_name, agg_func, m_type)
for agg_name, agg_func, m_type in aggregators
if agg_name in self.aggregates
]

metrics = [self.formatter(
Expand Down Expand Up @@ -284,7 +299,7 @@ def flush(self, ts, interval):
class Set(Metric):
""" A metric to track the number of unique elements in a set. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -318,7 +333,7 @@ def flush(self, timestamp, interval):
class Rate(Metric):
""" Track the rate of metrics over each flush interval """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -374,7 +389,9 @@ class Aggregator(object):
# Types of metrics that allow strings
ALLOW_STRINGS = ['s', ]

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
self.events = []
self.total_count = 0
self.count = 0
Expand All @@ -388,6 +405,14 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, r
self.recent_point_threshold = int(recent_point_threshold)
self.num_discarded_old_points = 0

# Additional config passed when instantiating metric configs
self.metric_config = {
Histogram: {
'aggregates': histogram_aggregates,
'percentiles': histogram_percentiles
}
}

def packets_per_second(self, interval):
if interval == 0:
return 0
Expand Down Expand Up @@ -593,8 +618,18 @@ class MetricsBucketAggregator(Aggregator):
A metric aggregator class.
"""

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
super(MetricsBucketAggregator, self).__init__(hostname, interval, expiry_seconds, formatter, recent_point_threshold)
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
super(MetricsBucketAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
)
self.metric_by_bucket = {}
self.last_sample_time_by_context = {}
self.current_bucket = None
Expand Down Expand Up @@ -647,7 +682,7 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None,
if context not in metric_by_context:
metric_class = self.metric_type_to_class[mtype]
metric_by_context[context] = metric_class(self.formatter, name, tags,
hostname, device_name)
hostname, device_name, self.metric_config.get(metric_class))

metric_by_context[context].sample(value, sample_rate, timestamp)

Expand Down Expand Up @@ -721,8 +756,18 @@ class MetricsAggregator(Aggregator):
A metric aggregator class.
"""

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
super(MetricsAggregator, self).__init__(hostname, interval, expiry_seconds, formatter, recent_point_threshold)
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
super(MetricsAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
)
self.metrics = {}
self.metric_type_to_class = {
'g': Gauge,
Expand All @@ -749,7 +794,7 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None,
if context not in self.metrics:
metric_class = self.metric_type_to_class[mtype]
self.metrics[context] = metric_class(self.formatter, name, tags,
hostname, device_name)
hostname, device_name, self.metric_config.get(metric_class))
cur_time = time()
if timestamp is not None and cur_time - int(timestamp) > self.recent_point_threshold:
log.debug("Discarding %s - ts = %s , current ts = %s " % (name, timestamp, cur_time))
Expand Down Expand Up @@ -812,18 +857,18 @@ def get_formatter(config):
formatter = api_formatter

if config['statsd_metric_namespace']:
def metric_namespace_formatter_wrapper(metric, value, timestamp, tags,
def metric_namespace_formatter_wrapper(metric, value, timestamp, tags,
hostname=None, device_name=None, metric_type=None, interval=None):
metric_prefix = config['statsd_metric_namespace']
if metric_prefix[-1] != '.':
metric_prefix += '.'

return api_formatter(metric_prefix + metric, value, timestamp, tags, hostname,
return api_formatter(metric_prefix + metric, value, timestamp, tags, hostname,
device_name, metric_type, interval)

formatter = metric_namespace_formatter_wrapper
return formatter


def api_formatter(metric, value, timestamp, tags, hostname=None, device_name=None,
metric_type=None, interval=None):
Expand Down
8 changes: 6 additions & 2 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,13 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.hostname = agentConfig.get('checksd_hostname') or get_hostname(agentConfig)
self.log = logging.getLogger('%s.%s' % (__name__, name))

self.aggregator = MetricsAggregator(self.hostname,
self.aggregator = MetricsAggregator(
self.hostname,
formatter=agent_formatter,
recent_point_threshold=agentConfig.get('recent_point_threshold', None))
recent_point_threshold=agentConfig.get('recent_point_threshold', None),
histogram_aggregates=agentConfig.get('histogram_aggregates'),
histogram_percentiles=agentConfig.get('histogram_percentiles')
)

self.events = []
self.service_checks = []
Expand Down
56 changes: 56 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,55 @@ def get_default_bind_host():
return '127.0.0.1'
return 'localhost'


def get_histogram_aggregates(configstr=None):
if configstr is None:
return None

try:
vals = configstr.split(',')
valid_values = ['min', 'max', 'median', 'avg', 'count']
result = []

for val in vals:
val = val.strip()
if val not in valid_values:
log.warning("Ignored histogram aggregate {0}, invalid".format(val))
continue
else:
result.append(val)
except Exception:
log.exception("Error when parsing histogram aggregates, skipping")
return None

return result

def get_histogram_percentiles(configstr=None):
if configstr is None:
return None

result = []
try:
vals = configstr.split(',')
for val in vals:
try:
val = val.strip()
floatval = float(val)
if floatval <= 0 or floatval >= 1:
raise ValueError
if len(val) > 4:
log.warning("Histogram percentiles are rounded to 2 digits: {0} rounded"\
.format(floatval))
result.append(float(val[0:4]))
except ValueError:
log.warning("Bad histogram percentile value {0}, must be float in ]0;1[, skipping"\
.format(val))
except Exception:
log.exception("Error when parsing histogram percentiles, skipping")
return None

return result

def get_config(parse_args=True, cfg_path=None, options=None):
if parse_args:
options, _ = get_parsed_args()
Expand Down Expand Up @@ -325,6 +374,13 @@ def get_config(parse_args=True, cfg_path=None, options=None):
except Exception:
pass

# Custom histogram aggregate/percentile metrics
if config.has_option('Main', 'histogram_aggregates'):
agentConfig['histogram_aggregates'] = get_histogram_aggregates(config.get('Main', 'histograms_aggregates'))

if config.has_option('Main', 'histogram_percentiles'):
agentConfig['histogram_percentiles'] = get_histogram_percentiles(config.get('Main', 'histograms_percentiles'))

# Disable Watchdog (optionally)
if config.has_option('Main', 'watchdog'):
if config.get('Main', 'watchdog').lower() in ('no', 'false'):
Expand Down
3 changes: 3 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ use_mount: no
# for instance for security reasons
# exclude_process_args: no

# histogram_aggregates: max, median, avg, count
# histogram_percentiles: 0.95

# ========================================================================== #
# DogStatsd configuration #
# ========================================================================== #
Expand Down
8 changes: 5 additions & 3 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
sleep(4)
sys.exit(0)

log.debug("Configurating dogstatsd")
log.debug("Configuring dogstatsd")

port = c['dogstatsd_port']
interval = DOGSTATSD_FLUSH_INTERVAL
Expand All @@ -393,8 +393,10 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
hostname,
aggregator_interval,
recent_point_threshold=recent_point_threshold,
formatter = get_formatter(c)
)
formatter=get_formatter(c),
histogram_aggregates=c.get('histogram_aggregates'),
histogram_percentiles=c.get('histogram_percentiles'),
)

# Start the reporting thread.
reporter = Reporter(interval, aggregator, target, api_key, use_watchdog, event_chunk_size)
Expand Down
Loading

0 comments on commit e84a996

Please sign in to comment.