Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make histogram aggregates & percentiles configurable globally #1303

Merged
merged 4 commits into from
Jan 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ unless ENV['IS_TRAVIS']
ENV['INTEGRATIONS_DIR'] = File.join(rakefile_dir, 'embedded')
ENV['PIP_CACHE'] = File.join(rakefile_dir, '.pip-cache')
ENV['VOLATILE_DIR'] = '/tmp/dd-agent-testing'
ENV['CONCURRENCY'] = ENV['CONCURRENCY'] || '2'
ENV['CONCURRENCY'] = ENV['CONCURRENCY'] || '2'
ENV['NOSE_FILTER'] = 'not windows'
end

desc 'Setup a development environment for the Agent'
Expand Down Expand Up @@ -74,7 +75,7 @@ end

desc "Lint the code through pylint"
task "lint" do
sh %{find . -name '*.py' -type f -not -path '*venv*' | xargs --max-procs=$CONCURRENCY -n 1 pylint --rcfile=./.pylintrc}
sh %{find . -name '*.py' -type f -not -path '*venv*' -not -path '*embedded*' -exec pylint --rcfile=./.pylintrc {} \\;}
end

desc "Run the Agent locally"
Expand Down
87 changes: 67 additions & 20 deletions aggregator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from time import time

from checks.metric_types import MetricTypes
from config import get_histogram_aggregates, get_histogram_percentiles

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,7 +40,7 @@ def flush(self, timestamp, interval):
class Gauge(Metric):
""" A metric that tracks a value at particular points in time. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = None
Expand Down Expand Up @@ -100,7 +102,7 @@ def flush(self, timestamp, interval):
class Count(Metric):
""" A metric that tracks a count. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = None
Expand Down Expand Up @@ -132,7 +134,7 @@ def flush(self, timestamp, interval):

class MonotonicCount(Metric):

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -180,7 +182,7 @@ def flush(self, timestamp, interval):
class Counter(Metric):
""" A metric that tracks a counter value. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.value = 0
Expand Down Expand Up @@ -209,16 +211,23 @@ def flush(self, timestamp, interval):
finally:
self.value = 0

DEFAULT_HISTOGRAM_AGGREGATES = ['max', 'median', 'avg', 'count']
DEFAULT_HISTOGRAM_PERCENTILES = [0.95]

class Histogram(Metric):
""" A metric to track the distribution of a set of values. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.count = 0
self.samples = []
self.percentiles = [0.95]
self.aggregates = extra_config['aggregates'] if\
extra_config is not None and extra_config.get('aggregates') is not None\
else DEFAULT_HISTOGRAM_AGGREGATES
self.percentiles = extra_config['percentiles'] if\
extra_config is not None and extra_config.get('percentiles') is not None\
else DEFAULT_HISTOGRAM_PERCENTILES
self.tags = tags
self.hostname = hostname
self.device_name = device_name
Expand All @@ -236,15 +245,23 @@ def flush(self, ts, interval):
self.samples.sort()
length = len(self.samples)

min_ = self.samples[0]
max_ = self.samples[-1]
med = self.samples[int(round(length/2 - 1))]
avg = sum(self.samples) / float(length)

metric_aggrs = [
aggregators = [
('min', min_, MetricTypes.GAUGE),
('max', max_, MetricTypes.GAUGE),
('median', med, MetricTypes.GAUGE),
('avg', avg, MetricTypes.GAUGE),
('count', self.count/interval, MetricTypes.RATE)
('count', self.count/interval, MetricTypes.RATE),
]

metric_aggrs = [
(agg_name, agg_func, m_type)
for agg_name, agg_func, m_type in aggregators
if agg_name in self.aggregates
]

metrics = [self.formatter(
Expand Down Expand Up @@ -282,7 +299,7 @@ def flush(self, ts, interval):
class Set(Metric):
""" A metric to track the number of unique elements in a set. """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -316,7 +333,7 @@ def flush(self, timestamp, interval):
class Rate(Metric):
""" Track the rate of metrics over each flush interval """

def __init__(self, formatter, name, tags, hostname, device_name):
def __init__(self, formatter, name, tags, hostname, device_name, extra_config=None):
self.formatter = formatter
self.name = name
self.tags = tags
Expand Down Expand Up @@ -372,7 +389,9 @@ class Aggregator(object):
# Types of metrics that allow strings
ALLOW_STRINGS = ['s', ]

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
self.events = []
self.total_count = 0
self.count = 0
Expand All @@ -386,6 +405,14 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, r
self.recent_point_threshold = int(recent_point_threshold)
self.num_discarded_old_points = 0

# Additional config passed when instantiating metric configs
self.metric_config = {
Histogram: {
'aggregates': histogram_aggregates,
'percentiles': histogram_percentiles
}
}

def packets_per_second(self, interval):
if interval == 0:
return 0
Expand Down Expand Up @@ -591,8 +618,18 @@ class MetricsBucketAggregator(Aggregator):
A metric aggregator class.
"""

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
super(MetricsBucketAggregator, self).__init__(hostname, interval, expiry_seconds, formatter, recent_point_threshold)
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
super(MetricsBucketAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
)
self.metric_by_bucket = {}
self.last_sample_time_by_context = {}
self.current_bucket = None
Expand Down Expand Up @@ -645,7 +682,7 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None,
if context not in metric_by_context:
metric_class = self.metric_type_to_class[mtype]
metric_by_context[context] = metric_class(self.formatter, name, tags,
hostname, device_name)
hostname, device_name, self.metric_config.get(metric_class))

metric_by_context[context].sample(value, sample_rate, timestamp)

Expand Down Expand Up @@ -719,8 +756,18 @@ class MetricsAggregator(Aggregator):
A metric aggregator class.
"""

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
super(MetricsAggregator, self).__init__(hostname, interval, expiry_seconds, formatter, recent_point_threshold)
def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
super(MetricsAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
)
self.metrics = {}
self.metric_type_to_class = {
'g': Gauge,
Expand All @@ -747,7 +794,7 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None,
if context not in self.metrics:
metric_class = self.metric_type_to_class[mtype]
self.metrics[context] = metric_class(self.formatter, name, tags,
hostname, device_name)
hostname, device_name, self.metric_config.get(metric_class))
cur_time = time()
if timestamp is not None and cur_time - int(timestamp) > self.recent_point_threshold:
log.debug("Discarding %s - ts = %s , current ts = %s " % (name, timestamp, cur_time))
Expand Down Expand Up @@ -810,18 +857,18 @@ def get_formatter(config):
formatter = api_formatter

if config['statsd_metric_namespace']:
def metric_namespace_formatter_wrapper(metric, value, timestamp, tags,
def metric_namespace_formatter_wrapper(metric, value, timestamp, tags,
hostname=None, device_name=None, metric_type=None, interval=None):
metric_prefix = config['statsd_metric_namespace']
if metric_prefix[-1] != '.':
metric_prefix += '.'

return api_formatter(metric_prefix + metric, value, timestamp, tags, hostname,
return api_formatter(metric_prefix + metric, value, timestamp, tags, hostname,
device_name, metric_type, interval)

formatter = metric_namespace_formatter_wrapper
return formatter


def api_formatter(metric, value, timestamp, tags, hostname=None, device_name=None,
metric_type=None, interval=None):
Expand Down
2 changes: 1 addition & 1 deletion checks.d/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ZKConnectionFailure(Exception):
pass


class Zookeeper(AgentCheck):
class ZookeeperCheck(AgentCheck):
version_pattern = re.compile(r'Zookeeper version: ([^.]+)\.([^.]+)\.([^-]+)', flags=re.I)

SOURCE_TYPE_NAME = 'zookeeper'
Expand Down
8 changes: 6 additions & 2 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,13 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.hostname = agentConfig.get('checksd_hostname') or get_hostname(agentConfig)
self.log = logging.getLogger('%s.%s' % (__name__, name))

self.aggregator = MetricsAggregator(self.hostname,
self.aggregator = MetricsAggregator(
self.hostname,
formatter=agent_formatter,
recent_point_threshold=agentConfig.get('recent_point_threshold', None))
recent_point_threshold=agentConfig.get('recent_point_threshold', None),
histogram_aggregates=agentConfig.get('histogram_aggregates'),
histogram_percentiles=agentConfig.get('histogram_percentiles')
)

self.events = []
self.service_checks = []
Expand Down
3 changes: 1 addition & 2 deletions ci/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
task :before_script => ['ci:common:before_script']

task :script => ['ci:common:script'] do
sh %(find . -name '*.py'\
| xargs --max-procs=0 -n 1 pylint --rcfile=./.pylintrc)
sh %(find . -name '*.py' -not -path '*venv*' -not -path '*embedded*' -exec pylint --rcfile=./.pylintrc {} \\;)
Rake::Task['ci:common:run_tests'].invoke('default')
end

Expand Down
56 changes: 56 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,55 @@ def get_default_bind_host():
return '127.0.0.1'
return 'localhost'


def get_histogram_aggregates(configstr=None):
if configstr is None:
return None

try:
vals = configstr.split(',')
valid_values = ['min', 'max', 'median', 'avg', 'count']
result = []

for val in vals:
val = val.strip()
if val not in valid_values:
log.warning("Ignored histogram aggregate {0}, invalid".format(val))
continue
else:
result.append(val)
except Exception:
log.exception("Error when parsing histogram aggregates, skipping")
return None

return result

def get_histogram_percentiles(configstr=None):
if configstr is None:
return None

result = []
try:
vals = configstr.split(',')
for val in vals:
try:
val = val.strip()
floatval = float(val)
if floatval <= 0 or floatval >= 1:
raise ValueError
if len(val) > 4:
log.warning("Histogram percentiles are rounded to 2 digits: {0} rounded"\
.format(floatval))
result.append(float(val[0:4]))
except ValueError:
log.warning("Bad histogram percentile value {0}, must be float in ]0;1[, skipping"\
.format(val))
except Exception:
log.exception("Error when parsing histogram percentiles, skipping")
return None

return result

def get_config(parse_args=True, cfg_path=None, options=None):
if parse_args:
options, _ = get_parsed_args()
Expand Down Expand Up @@ -325,6 +374,13 @@ def get_config(parse_args=True, cfg_path=None, options=None):
except Exception:
pass

# Custom histogram aggregate/percentile metrics
if config.has_option('Main', 'histogram_aggregates'):
agentConfig['histogram_aggregates'] = get_histogram_aggregates(config.get('Main', 'histograms_aggregates'))

if config.has_option('Main', 'histogram_percentiles'):
agentConfig['histogram_percentiles'] = get_histogram_percentiles(config.get('Main', 'histograms_percentiles'))

# Disable Watchdog (optionally)
if config.has_option('Main', 'watchdog'):
if config.get('Main', 'watchdog').lower() in ('no', 'false'):
Expand Down
3 changes: 3 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ use_mount: no
# for instance for security reasons
# exclude_process_args: no

# histogram_aggregates: max, median, avg, count
# histogram_percentiles: 0.95

# ========================================================================== #
# DogStatsd configuration #
# ========================================================================== #
Expand Down
8 changes: 5 additions & 3 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
sleep(4)
sys.exit(0)

log.debug("Configurating dogstatsd")
log.debug("Configuring dogstatsd")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😑


port = c['dogstatsd_port']
interval = DOGSTATSD_FLUSH_INTERVAL
Expand All @@ -393,8 +393,10 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
hostname,
aggregator_interval,
recent_point_threshold=recent_point_threshold,
formatter = get_formatter(c)
)
formatter=get_formatter(c),
histogram_aggregates=c.get('histogram_aggregates'),
histogram_percentiles=c.get('histogram_percentiles'),
)

# Start the reporting thread.
reporter = Reporter(interval, aggregator, target, api_key, use_watchdog, event_chunk_size)
Expand Down
Loading