diff --git a/autopush/metrics.py b/autopush/metrics.py index ef630228..d5094ca9 100644 --- a/autopush/metrics.py +++ b/autopush/metrics.py @@ -1,4 +1,6 @@ """Metrics interface and implementations""" +import time +import threading from typing import ( # noqa TYPE_CHECKING, Any, @@ -7,7 +9,7 @@ ) from twisted.internet import reactor - +from twisted.logger import Logger import markus from autopush import logging @@ -16,6 +18,9 @@ from autopush.config import AutopushConfig # noqa +log = Logger() + + class IMetrics(object): """Metrics interface @@ -67,7 +72,7 @@ def make_tags(base=None, **kwargs): class TaggedMetrics(IMetrics): """DataDog like tagged Metric backend""" def __init__(self, hostname, statsd_host=None, statsd_port=None, - namespace="autopush"): + namespace="autopush", flush_interval=10): markus.configure( backends=[{ 'class': 'markus.backends.datadog.DatadogMetrics', @@ -79,11 +84,35 @@ def __init__(self, hostname, statsd_host=None, statsd_port=None, self._host = hostname self._namespace = namespace + self._metrics = [] + self._flush_interval = flush_interval + self._thread = None + self._lock = threading.RLock() + + self.start() + def _prefix_name(self, name): return name def start(self): - pass + + def flush_thread(): + while True: + try: + self._flush() + except Exception: + log.failure("Error flushing metrics") + time.sleep(self._flush_interval) + self._thread = thread = threading.Thread(target=flush_thread) + thread.daemon = True + thread.start() + + def _flush(self): + with self._lock: + metrics = self._metrics + self._metrics = [] + for (fn, name, kwargs) in metrics: + fn(name, **kwargs) def _make_tags(self, tags): if tags is None: @@ -91,24 +120,28 @@ def _make_tags(self, tags): tags.append('host:%s' % self._host) return tags + def _queue_metric(self, fn, name, **kwargs): + with self._lock: + self._metrics.append((fn, name, kwargs)) + def increment(self, name, count=1, tags=None, **kwargs): - reactor.callInThread( + self._queue_metric( self._client.incr, self._prefix_name(name), - count, + value=count, tags=self._make_tags(tags) ) def gauge(self, name, count, tags=None, **kwargs): - reactor.callInThread( + self._queue_metric( self._client.gauge, self._prefix_name(name), - count, + value=count, tags=self._make_tags(tags) ) def timing(self, name, duration, tags=None, **kwargs): - reactor.callInThread( + self._queue_metric( self._client.timing, self._prefix_name(name), value=duration, diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index 280455e4..af4b56b2 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -555,7 +555,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): # Piggy back a check for stored source metrics self.conn.db.metrics = TaggedMetrics( namespace="testpush", - hostname="localhost") + hostname="localhost", + flush_interval=1) self.conn.db.metrics._client = Mock() client = Client("ws://localhost:{}/".format(self.connection_port)) @@ -584,6 +585,7 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): yield client.ack(chan, result["version"]) assert self.logs.logged_ci(lambda ci: 'message_size' in ci) + time.sleep(1.5) inc_call = self.conn.db.metrics._client.incr.call_args_list[5] assert inc_call[1]['tags'] == ['source:Stored', 'host:localhost'] yield self.shut_down(client)