Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #948 from mozilla-services/feat/943
Browse files Browse the repository at this point in the history
feat: Use modern metrics
  • Loading branch information
bbangert authored Jul 10, 2017
2 parents a938c02 + 39db9a7 commit 3e4a2a1
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 154 deletions.
16 changes: 2 additions & 14 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
attrib,
Factory
)
from boto.exception import JSONResponseError, BotoServerError
from boto.exception import JSONResponseError
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ItemNotFound,
Expand Down Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
10 changes: 9 additions & 1 deletion autopush/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Metrics interface and implementations"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence, Any # noqa

from twisted.internet import reactor
from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient
Expand Down Expand Up @@ -74,6 +74,14 @@ def timing(self, name, duration, **kwargs):
self._metric.timing(name, duration)


def make_tags(base=None, **kwargs):
# type: (Sequence[str], **Any) -> Sequence[str]
"""Generate a list of tag values"""
tags = list(base or [])
tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems())
return tags


class DatadogMetrics(object):
"""DataDog Metric backend"""
def __init__(self, api_key, app_key, hostname, flush_interval=10,
Expand Down
17 changes: 14 additions & 3 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from twisted.python.failure import Failure

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.apns2 import (
APNSClient,
APNS_MAX_CONNECTIONS,
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, ap_settings, router_conf, metrics,
self.ap_settings = ap_settings
self._config = router_conf
self.metrics = metrics
self._base_tags = []
self._base_tags = ["platform:apns"]
self.apns = dict()
for rel_channel in self._config:
self.apns[rel_channel] = self._connect(rel_channel,
Expand Down Expand Up @@ -144,8 +145,10 @@ def _route(self, notification, router_data):
apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
except (ConnectionError, AttributeError) as ex:
self.metrics.increment("updates.client.bridge.apns.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
application=rel_channel,
reason="connection_error"))
self.log.error("Connection Error sending to APNS",
log_failure=Failure(ex))
raise RouterException(
Expand All @@ -165,11 +168,19 @@ def _route(self, notification, router_data):

location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
self.metrics.increment("notification.bridge.sent",
tags=make_tags(self._base_tags,
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
33 changes: 20 additions & 13 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.auth = router_conf.get("auth")
self._base_tags = []
self._base_tags = ["platform:fcm"]
try:
self.fcm = pyfcm.FCMNotification(api_key=self.auth)
except Exception as e:
Expand Down Expand Up @@ -165,12 +166,12 @@ def _route(self, notification, router_data):
# correct encryption headers are included with the data.
if notification.data:
mdata = self.config.get('max_data', 4096)
if len(notification.data) > mdata:
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(len(notification.data) - mdata),
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
Expand Down Expand Up @@ -199,16 +200,16 @@ def _route(self, notification, router_data):
self.log.error("Authentication Error: %s" % e)
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.metrics.increment("updates.client.bridge.fcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
self.log.warn("Could not connect to FCM server: %s" % e)
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled FCM Error: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.fcm.attempted",
self._base_tags)
return self._process_reply(result, notification, router_data,
ttl=router_ttl)

Expand All @@ -229,20 +230,22 @@ def _process_reply(self, reply, notification, router_data, ttl):
new_id = result.get('registration_id')
self.log.debug("FCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.fcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
if reply.get('failure'):
self.metrics.increment("updates.client.bridge.fcm.failed",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
reason = result.get('error', "Unreported")
err = self.reasonTable.get(reason)
if err.get("crit", False):
self.log.critical(
err['msg'],
nlen=len(notification.data),
nlen=notification.data_length,
regid=router_data["token"],
senderid=self.senderID,
ttl=notification.ttl,
Expand All @@ -263,8 +266,12 @@ def _process_reply(self, reply, notification, router_data, ttl):
response_body=err['msg'],
router_data={},
)
self.metrics.increment("updates.client.bridge.fcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
41 changes: 25 additions & 16 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.gcm[sid] = gcmclient.GCM(auth)
except:
raise IOError("GCM Bridge not initiated in main")
self._base_tags = []
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -82,12 +83,12 @@ def _route(self, notification, uaid_data):
# correct encryption headers are included with the data.
if notification.data:
mdata = self.config.get('max_data', 4096)
if len(notification.data) > mdata:
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(len(notification.data) - mdata),
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
Expand Down Expand Up @@ -122,15 +123,15 @@ def _route(self, notification, uaid_data):
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
self.metrics.increment("updates.client.bridge.gcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.gcm.attempted",
self._base_tags)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)

Expand All @@ -148,16 +149,18 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
for old_id, new_id in reply.canonical.items():
self.log.debug("GCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.gcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
# naks:
# uninstall:
for reg_id in reply.not_registered:
self.metrics.increment("updates.client.bridge.gcm.failed.unreg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="unregistered"))
self.log.debug("GCM no longer registered: %s" % reg_id)
return RouterResponse(
status_code=410,
Expand All @@ -167,8 +170,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# for reg_id, err_code in reply.failed.items():
if len(reply.failed.items()) > 0:
self.metrics.increment("updates.client.bridge.gcm.failed.failure",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
self.log.debug("GCM failures: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM unable to deliver", status_code=410,
Expand All @@ -178,17 +182,22 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# retries:
if reply.needs_retry():
self.metrics.increment("updates.client.bridge.gcm.failed.retry",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="retry"))
self.log.warn("GCM retry requested: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM failure to deliver, retry",
status_code=503,
response_body="Please try request later.",
log_exception=False)

self.metrics.increment("updates.client.bridge.gcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
Loading

0 comments on commit 3e4a2a1

Please sign in to comment.