diff --git a/aggregator.py b/aggregator.py index 06e98d49e1..0d32e81d83 100644 --- a/aggregator.py +++ b/aggregator.py @@ -416,13 +416,13 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None, def event(self, title, text, date_happened=None, alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): event = { - 'title': title, - 'text': text, + 'msg_title': title, + 'msg_text': text, } if date_happened is not None: - event['date_happened'] = date_happened + event['timestamp'] = date_happened else: - event['date_happened'] = int(time()) + event['timestamp'] = int(time()) if alert_type is not None: event['alert_type'] = alert_type if aggregation_key is not None: diff --git a/dogstatsd.py b/dogstatsd.py index 9dd835792c..f6d4830dee 100755 --- a/dogstatsd.py +++ b/dogstatsd.py @@ -21,6 +21,7 @@ import sys from time import time import threading +import math from urllib import urlencode # project @@ -28,7 +29,7 @@ from checks.check_status import DogstatsdStatus from config import get_config from daemon import Daemon, AgentSupervisor -from util import json, PidFile, get_hostname, plural +from util import json, PidFile, get_hostname, plural, get_uuid, chunks log = logging.getLogger('dogstatsd') @@ -40,6 +41,7 @@ FLUSH_LOGGING_PERIOD = 70 FLUSH_LOGGING_INITIAL = 10 FLUSH_LOGGING_COUNT = 5 +EVENT_CHUNK_SIZE = 50 def serialize_metrics(metrics): return json.dumps({"series" : metrics}) @@ -53,7 +55,7 @@ class Reporter(threading.Thread): server. """ - def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_watchdog=False): + def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_watchdog=False, event_chunk_size=None): threading.Thread.__init__(self) self.interval = int(interval) self.finished = threading.Event() @@ -68,6 +70,7 @@ def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_wat self.api_key = api_key self.api_host = api_host + self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE self.http_conn_cls = http_client.HTTPSConnection @@ -175,19 +178,28 @@ def submit_events(self, events): headers = {'Content-Type':'application/json'} method = 'POST' - params = {} - if self.api_key: - params['api_key'] = self.api_key - url = '/api/v1/events?%s' % urlencode(params) - - status = None - conn = self.http_conn_cls(self.api_host) - try: - for event in events: + events_len = len(events) + event_chunk_size = self.event_chunk_size + + for chunk in chunks(events, event_chunk_size): + payload = { + 'apiKey': self.api_key, + 'events': { + 'api': chunk + }, + 'uuid': get_uuid(), + 'internalHostname': get_hostname() + } + params = {} + if self.api_key: + params['api_key'] = self.api_key + url = '/intake?%s' % urlencode(params) + + status = None + conn = self.http_conn_cls(self.api_host) + try: start_time = time() - body = serialize_event(event) - log.debug('Sending event: %s' % body) - conn.request(method, url, body, headers) + conn.request(method, url, json.dumps(payload), headers) response = conn.getresponse() status = response.status @@ -195,8 +207,9 @@ def submit_events(self, events): duration = round((time() - start_time) * 1000.0, 4) log.debug("%s %s %s%s (%sms)" % ( status, method, self.api_host, url, duration)) - finally: - conn.close() + + finally: + conn.close() class Server(object): """ @@ -336,6 +349,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False): non_local_traffic = c['non_local_traffic'] forward_to_host = c.get('statsd_forward_host') forward_to_port = c.get('statsd_forward_port') + event_chunk_size = c.get('event_chunk_size') target = c['dd_url'] if use_forwarder: @@ -350,7 +364,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False): aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None)) # Start the reporting thread. - reporter = Reporter(interval, aggregator, target, api_key, use_watchdog) + reporter = Reporter(interval, aggregator, target, api_key, use_watchdog, event_chunk_size) # Start the server on an IPv4 stack # Default to loopback diff --git a/tests/test_bucket_aggregator.py b/tests/test_bucket_aggregator.py index c0f1be24e0..b642fb5369 100644 --- a/tests/test_bucket_aggregator.py +++ b/tests/test_bucket_aggregator.py @@ -21,7 +21,7 @@ def sort_by(m): @staticmethod def sort_events(metrics): def sort_by(m): - return (m['title'], m['text'], ','.join(m.get('tags', None) or [])) + return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or [])) return sorted(metrics, key=sort_by) def sleep_for_interval_length(self, interval=None): @@ -883,19 +883,19 @@ def test_event_tags(self): assert True else: assert False, "event['tags'] shouldn't be defined when no tags aren't explicited in the packet" - nt.assert_equal(first['title'], 'title1') - nt.assert_equal(first['text'], 'text') + nt.assert_equal(first['msg_title'], 'title1') + nt.assert_equal(first['msg_text'], 'text') - nt.assert_equal(second['title'], 'title2') - nt.assert_equal(second['text'], 'text') + nt.assert_equal(second['msg_title'], 'title2') + nt.assert_equal(second['msg_text'], 'text') nt.assert_equal(second['tags'], sorted(['t1'])) - nt.assert_equal(third['title'], 'title3') - nt.assert_equal(third['text'], 'text') + nt.assert_equal(third['msg_title'], 'title3') + nt.assert_equal(third['msg_text'], 'text') nt.assert_equal(third['tags'], sorted(['t1', 't2:v2', 't3', 't4'])) - nt.assert_equal(fourth['title'], 'title4') - nt.assert_equal(fourth['text'], 'text') + nt.assert_equal(fourth['msg_title'], 'title4') + nt.assert_equal(fourth['msg_text'], 'text') nt.assert_equal(fourth['aggregation_key'], 'key') nt.assert_equal(fourth['priority'], 'normal') nt.assert_equal(fourth['tags'], sorted(['t1', 't2'])) @@ -913,11 +913,11 @@ def test_event_title(self): assert len(events) == 5 first, second, third, fourth, fifth = events - nt.assert_equal(first['title'], '') - nt.assert_equal(second['title'], u'2intitulé') - nt.assert_equal(third['title'], '3title content') - nt.assert_equal(fourth['title'], '4title|content') - nt.assert_equal(fifth['title'], '5title\\ntitle') + nt.assert_equal(first['msg_title'], '') + nt.assert_equal(second['msg_title'], u'2intitulé') + nt.assert_equal(third['msg_title'], '3title content') + nt.assert_equal(fourth['msg_title'], '4title|content') + nt.assert_equal(fifth['msg_title'], '5title\\ntitle') def test_event_text(self): stats = MetricsBucketAggregator('myhost', interval=self.interval) @@ -931,10 +931,10 @@ def test_event_text(self): assert len(events) == 4 first, second, third, fourth = events - nt.assert_equal(first['text'], '') - nt.assert_equal(second['text'], 'text|content') - nt.assert_equal(third['text'], 'First line\nSecond line') - nt.assert_equal(fourth['text'], u'♬ †øU †øU ¥ºu T0µ ♪') + nt.assert_equal(first['msg_text'], '') + nt.assert_equal(second['msg_text'], 'text|content') + nt.assert_equal(third['msg_text'], 'First line\nSecond line') + nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪') def test_recent_point_threshold(self): ag_interval = 1 diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py index cb30dcfe53..d5e8ea40d5 100644 --- a/tests/test_dogstatsd.py +++ b/tests/test_dogstatsd.py @@ -19,7 +19,7 @@ def sort_by(m): @staticmethod def sort_events(metrics): def sort_by(m): - return (m['title'], m['text'], ','.join(m.get('tags', None) or [])) + return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or [])) return sorted(metrics, key=sort_by) @staticmethod @@ -466,19 +466,19 @@ def test_event_tags(self): assert True else: assert False, "event['tags'] shouldn't be defined when no tags aren't explicited in the packet" - nt.assert_equal(first['title'], 'title1') - nt.assert_equal(first['text'], 'text') + nt.assert_equal(first['msg_title'], 'title1') + nt.assert_equal(first['msg_text'], 'text') - nt.assert_equal(second['title'], 'title2') - nt.assert_equal(second['text'], 'text') + nt.assert_equal(second['msg_title'], 'title2') + nt.assert_equal(second['msg_text'], 'text') nt.assert_equal(second['tags'], sorted(['t1'])) - nt.assert_equal(third['title'], 'title3') - nt.assert_equal(third['text'], 'text') + nt.assert_equal(third['msg_title'], 'title3') + nt.assert_equal(third['msg_text'], 'text') nt.assert_equal(third['tags'], sorted(['t1', 't2:v2', 't3', 't4'])) - nt.assert_equal(fourth['title'], 'title4') - nt.assert_equal(fourth['text'], 'text') + nt.assert_equal(fourth['msg_title'], 'title4') + nt.assert_equal(fourth['msg_text'], 'text') nt.assert_equal(fourth['aggregation_key'], 'key') nt.assert_equal(fourth['priority'], 'normal') nt.assert_equal(fourth['tags'], sorted(['t1', 't2'])) @@ -496,11 +496,11 @@ def test_event_title(self): assert len(events) == 5 first, second, third, fourth, fifth = events - nt.assert_equal(first['title'], '') - nt.assert_equal(second['title'], u'2intitulé') - nt.assert_equal(third['title'], '3title content') - nt.assert_equal(fourth['title'], '4title|content') - nt.assert_equal(fifth['title'], '5title\\ntitle') + nt.assert_equal(first['msg_title'], '') + nt.assert_equal(second['msg_title'], u'2intitulé') + nt.assert_equal(third['msg_title'], '3title content') + nt.assert_equal(fourth['msg_title'], '4title|content') + nt.assert_equal(fifth['msg_title'], '5title\\ntitle') def test_event_text(self): stats = MetricsAggregator('myhost') @@ -514,10 +514,10 @@ def test_event_text(self): assert len(events) == 4 first, second, third, fourth = events - nt.assert_equal(first['text'], '') - nt.assert_equal(second['text'], 'text|content') - nt.assert_equal(third['text'], 'First line\nSecond line') - nt.assert_equal(fourth['text'], u'♬ †øU †øU ¥ºu T0µ ♪') + nt.assert_equal(first['msg_text'], '') + nt.assert_equal(second['msg_text'], 'text|content') + nt.assert_equal(third['msg_text'], 'First line\nSecond line') + nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪') def test_recent_point_threshold(self): threshold = 100 diff --git a/util.py b/util.py index 77a08a5b6e..c520073508 100644 --- a/util.py +++ b/util.py @@ -22,6 +22,7 @@ except ImportError: from md5 import md5 + VALID_HOSTNAME_RFC_1123_PATTERN = re.compile(r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$") MAX_HOSTNAME_LEN = 255 # Import json for the agent. Try simplejson first, then the stdlib version and @@ -590,3 +591,23 @@ def is_unix(name=None): def is_win32(name=None): name = name or sys.platform return name == "win32" + +""" +Iterable Recipes +""" + +def chunks(iterable, chunk_size): + """Generate sequences of `chunk_size` elements from `iterable`.""" + iterable = iter(iterable) + while True: + chunk = [None] * chunk_size + count = 0 + try: + for _ in range(chunk_size): + chunk[count] = iterable.next() + count += 1 + yield chunk[:count] + except StopIteration: + if count: + yield chunk[:count] + break