Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Use modern metrics
Browse files Browse the repository at this point in the history
Convert all tags to lists via make_tags

Closes #943, #950
  • Loading branch information
jrconlin committed Jul 7, 2017
1 parent 40f15e0 commit 688e63a
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 148 deletions.
16 changes: 2 additions & 14 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
attrib,
Factory
)
from boto.exception import JSONResponseError, BotoServerError
from boto.exception import JSONResponseError
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ItemNotFound,
Expand Down Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
10 changes: 9 additions & 1 deletion autopush/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Metrics interface and implementations"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence, Any # noqa

from twisted.internet import reactor
from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient
Expand Down Expand Up @@ -74,6 +74,14 @@ def timing(self, name, duration, **kwargs):
self._metric.timing(name, duration)


def make_tags(base=[], **kwargs):
# type: (Sequence[str], **Any) -> Sequence[str]
"""Generate a list of tag values"""
tags = list(base)
tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems())
return tags


class DatadogMetrics(object):
"""DataDog Metric backend"""
def __init__(self, api_key, app_key, hostname, flush_interval=10,
Expand Down
17 changes: 14 additions & 3 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from twisted.python.failure import Failure

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.apns2 import (
APNSClient,
APNS_MAX_CONNECTIONS,
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, ap_settings, router_conf, metrics,
self.ap_settings = ap_settings
self._config = router_conf
self.metrics = metrics
self._base_tags = []
self._base_tags = ["platform:apns"]
self.apns = dict()
for rel_channel in self._config:
self.apns[rel_channel] = self._connect(rel_channel,
Expand Down Expand Up @@ -144,8 +145,10 @@ def _route(self, notification, router_data):
apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
except (ConnectionError, AttributeError) as ex:
self.metrics.increment("updates.client.bridge.apns.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
application=rel_channel,
reason="connection_error"))
self.log.error("Connection Error sending to APNS",
log_failure=Failure(ex))
raise RouterException(
Expand All @@ -165,11 +168,19 @@ def _route(self, notification, router_data):

location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
self.metrics.increment("notification.bridge.sent",
tags=make_tags(self._base_tags,
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
27 changes: 17 additions & 10 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.auth = router_conf.get("auth")
self._base_tags = []
self._base_tags = ["platform:fcm"]
try:
self.fcm = pyfcm.FCMNotification(api_key=self.auth)
except Exception as e:
Expand Down Expand Up @@ -199,16 +200,16 @@ def _route(self, notification, router_data):
self.log.error("Authentication Error: %s" % e)
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.metrics.increment("updates.client.bridge.fcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
self.log.warn("Could not connect to FCM server: %s" % e)
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled FCM Error: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.fcm.attempted",
self._base_tags)
return self._process_reply(result, notification, router_data,
ttl=router_ttl)

Expand All @@ -229,14 +230,16 @@ def _process_reply(self, reply, notification, router_data, ttl):
new_id = result.get('registration_id')
self.log.debug("FCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.fcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
if reply.get('failure'):
self.metrics.increment("updates.client.bridge.fcm.failed",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
reason = result.get('error', "Unreported")
err = self.reasonTable.get(reason)
if err.get("crit", False):
Expand All @@ -263,8 +266,12 @@ def _process_reply(self, reply, notification, router_data, ttl):
response_body=err['msg'],
router_data={},
)
self.metrics.increment("updates.client.bridge.fcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
37 changes: 23 additions & 14 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.gcm[sid] = gcmclient.GCM(auth)
except:
raise IOError("GCM Bridge not initiated in main")
self._base_tags = []
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -122,15 +123,15 @@ def _route(self, notification, uaid_data):
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
self.metrics.increment("updates.client.bridge.gcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.gcm.attempted",
self._base_tags)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)

Expand All @@ -148,16 +149,18 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
for old_id, new_id in reply.canonical.items():
self.log.debug("GCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.gcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
# naks:
# uninstall:
for reg_id in reply.not_registered:
self.metrics.increment("updates.client.bridge.gcm.failed.unreg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="unregistered"))
self.log.debug("GCM no longer registered: %s" % reg_id)
return RouterResponse(
status_code=410,
Expand All @@ -167,8 +170,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# for reg_id, err_code in reply.failed.items():
if len(reply.failed.items()) > 0:
self.metrics.increment("updates.client.bridge.gcm.failed.failure",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
self.log.debug("GCM failures: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM unable to deliver", status_code=410,
Expand All @@ -178,17 +182,22 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# retries:
if reply.needs_retry():
self.metrics.increment("updates.client.bridge.gcm.failed.retry",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="retry"))
self.log.warn("GCM retry requested: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM failure to deliver, retry",
status_code=503,
response_body="Please try request later.",
log_exception=False)

self.metrics.increment("updates.client.bridge.gcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(self._base_tags,
destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
14 changes: 7 additions & 7 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from twisted.web.client import FileBodyProducer

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.protocol import IgnoreBody
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa
Expand Down Expand Up @@ -61,9 +62,15 @@ def amend_endpoint_response(self, response, router_data):
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(destination='Stored'))
return RouterResponse(202, "Notification Stored")

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(destination='Direct'))
return RouterResponse(200, "Delivered")

@inlineCallbacks
Expand Down Expand Up @@ -98,7 +105,6 @@ def route_notification(self, notification, uaid_data):
# in AWS or if the connection timesout.
self.log.debug("Could not route message: {exc}", exc=exc)
if result and result.code == 200:
self.metrics.increment("router.broadcast.hit")
returnValue(self.delivered_response(notification))

# Save notification, node is not present or busy
Expand All @@ -108,7 +114,6 @@ def route_notification(self, notification, uaid_data):
try:
result = yield self._save_notification(uaid_data, notification)
if result is False:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
except JSONResponseError:
raise RouterException("Error saving to database",
Expand All @@ -128,7 +133,6 @@ def route_notification(self, notification, uaid_data):
try:
uaid_data = yield deferToThread(router.get_uaid, uaid)
except JSONResponseError:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
except ItemNotFound:
self.metrics.increment("updates.client.deleted")
Expand All @@ -141,7 +145,6 @@ def route_notification(self, notification, uaid_data):
# Verify there's a node_id in here, if not we're done
node_id = uaid_data.get("node_id")
if not node_id:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
try:
result = yield self._send_notification_check(uaid, node_id)
Expand All @@ -152,14 +155,11 @@ def route_notification(self, notification, uaid_data):
yield deferToThread(
router.clear_node,
uaid_data).addErrback(self._eat_db_err)
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))

if result.code == 200:
self.metrics.increment("router.broadcast.save_hit")
returnValue(self.delivered_response(notification))
else:
self.metrics.increment("router.broadcast.miss")
ret_val = self.stored_response(notification)
if self.udp is not None and "server" in self.conf:
# Attempt to send off the UDP wake request.
Expand Down
7 changes: 7 additions & 0 deletions autopush/router/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from twisted.web.client import FileBodyProducer

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.protocol import IgnoreBody
from autopush.router.interface import RouterResponse
from autopush.router.simple import SimpleRouter
Expand All @@ -24,6 +25,9 @@ class WebPushRouter(SimpleRouter):
"""SimpleRouter subclass to store individual messages appropriately"""

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(destination='Stored'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand All @@ -32,6 +36,9 @@ def delivered_response(self, notification):
logged_status=200)

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags=make_tags(destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand Down
Loading

0 comments on commit 688e63a

Please sign in to comment.