Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: followup for 1408 (#1411)
Browse files Browse the repository at this point in the history
queue metrics up for one single thread emitter to avoid adding every
metric call to twisted's thread pool

Closes #1408
  • Loading branch information
pjenvey authored Jul 7, 2020
1 parent 4a31b1e commit 022070d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
49 changes: 41 additions & 8 deletions autopush/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Metrics interface and implementations"""
import time
import threading
from typing import ( # noqa
TYPE_CHECKING,
Any,
Expand All @@ -7,7 +9,7 @@
)

from twisted.internet import reactor

from twisted.logger import Logger
import markus

from autopush import logging
Expand All @@ -16,6 +18,9 @@
from autopush.config import AutopushConfig # noqa


log = Logger()


class IMetrics(object):
"""Metrics interface
Expand Down Expand Up @@ -67,7 +72,7 @@ def make_tags(base=None, **kwargs):
class TaggedMetrics(IMetrics):
"""DataDog like tagged Metric backend"""
def __init__(self, hostname, statsd_host=None, statsd_port=None,
namespace="autopush"):
namespace="autopush", flush_interval=10):
markus.configure(
backends=[{
'class': 'markus.backends.datadog.DatadogMetrics',
Expand All @@ -79,36 +84,64 @@ def __init__(self, hostname, statsd_host=None, statsd_port=None,
self._host = hostname
self._namespace = namespace

self._metrics = []
self._flush_interval = flush_interval
self._thread = None
self._lock = threading.RLock()

self.start()

def _prefix_name(self, name):
return name

def start(self):
pass

def flush_thread():
while True:
try:
self._flush()
except Exception:
log.failure("Error flushing metrics")
time.sleep(self._flush_interval)
self._thread = thread = threading.Thread(target=flush_thread)
thread.daemon = True
thread.start()

def _flush(self):
with self._lock:
metrics = self._metrics
self._metrics = []
for (fn, name, kwargs) in metrics:
fn(name, **kwargs)

def _make_tags(self, tags):
if tags is None:
tags = []
tags.append('host:%s' % self._host)
return tags

def _queue_metric(self, fn, name, **kwargs):
with self._lock:
self._metrics.append((fn, name, kwargs))

def increment(self, name, count=1, tags=None, **kwargs):
reactor.callInThread(
self._queue_metric(
self._client.incr,
self._prefix_name(name),
count,
value=count,
tags=self._make_tags(tags)
)

def gauge(self, name, count, tags=None, **kwargs):
reactor.callInThread(
self._queue_metric(
self._client.gauge,
self._prefix_name(name),
count,
value=count,
tags=self._make_tags(tags)
)

def timing(self, name, duration, tags=None, **kwargs):
reactor.callInThread(
self._queue_metric(
self._client.timing,
self._prefix_name(name),
value=duration,
Expand Down
4 changes: 3 additions & 1 deletion autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):
# Piggy back a check for stored source metrics
self.conn.db.metrics = TaggedMetrics(
namespace="testpush",
hostname="localhost")
hostname="localhost",
flush_interval=1)
self.conn.db.metrics._client = Mock()

client = Client("ws://localhost:{}/".format(self.connection_port))
Expand Down Expand Up @@ -584,6 +585,7 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):
yield client.ack(chan, result["version"])

assert self.logs.logged_ci(lambda ci: 'message_size' in ci)
time.sleep(1.5)
inc_call = self.conn.db.metrics._client.incr.call_args_list[5]
assert inc_call[1]['tags'] == ['source:Stored', 'host:localhost']
yield self.shut_down(client)
Expand Down

0 comments on commit 022070d

Please sign in to comment.