Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: Use modern metrics #948

Merged
merged 1 commit into from
Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
attrib,
Factory
)
from boto.exception import JSONResponseError, BotoServerError
from boto.exception import JSONResponseError
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ItemNotFound,
Expand Down Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should stop using moto in the future, then we can remove this entirely since it only really applies to testing.

Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
10 changes: 9 additions & 1 deletion autopush/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Metrics interface and implementations"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence, Any # noqa

from twisted.internet import reactor
from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient
Expand Down Expand Up @@ -74,6 +74,14 @@ def timing(self, name, duration, **kwargs):
self._metric.timing(name, duration)


def make_tags(base=None, **kwargs):
# type: (Sequence[str], **Any) -> Sequence[str]
"""Generate a list of tag values"""
tags = list(base or [])
tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems())
return tags


class DatadogMetrics(object):
"""DataDog Metric backend"""
def __init__(self, api_key, app_key, hostname, flush_interval=10,
Expand Down
17 changes: 14 additions & 3 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from twisted.python.failure import Failure

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.apns2 import (
APNSClient,
APNS_MAX_CONNECTIONS,
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, ap_settings, router_conf, metrics,
self.ap_settings = ap_settings
self._config = router_conf
self.metrics = metrics
self._base_tags = []
self._base_tags = ["platform:apns"]
self.apns = dict()
for rel_channel in self._config:
self.apns[rel_channel] = self._connect(rel_channel,
Expand Down Expand Up @@ -144,8 +145,10 @@ def _route(self, notification, router_data):
apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
except (ConnectionError, AttributeError) as ex:
self.metrics.increment("updates.client.bridge.apns.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
application=rel_channel,
reason="connection_error"))
self.log.error("Connection Error sending to APNS",
log_failure=Failure(ex))
raise RouterException(
Expand All @@ -165,11 +168,19 @@ def _route(self, notification, router_data):

location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
self.metrics.increment("notification.bridge.sent",
tags=make_tags(self._base_tags,
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
33 changes: 20 additions & 13 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.auth = router_conf.get("auth")
self._base_tags = []
self._base_tags = ["platform:fcm"]
try:
self.fcm = pyfcm.FCMNotification(api_key=self.auth)
except Exception as e:
Expand Down Expand Up @@ -165,12 +166,12 @@ def _route(self, notification, router_data):
# correct encryption headers are included with the data.
if notification.data:
mdata = self.config.get('max_data', 4096)
if len(notification.data) > mdata:
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(len(notification.data) - mdata),
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
Expand Down Expand Up @@ -199,16 +200,16 @@ def _route(self, notification, router_data):
self.log.error("Authentication Error: %s" % e)
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.metrics.increment("updates.client.bridge.fcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
self.log.warn("Could not connect to FCM server: %s" % e)
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled FCM Error: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.fcm.attempted",
self._base_tags)
return self._process_reply(result, notification, router_data,
ttl=router_ttl)

Expand All @@ -229,20 +230,22 @@ def _process_reply(self, reply, notification, router_data, ttl):
new_id = result.get('registration_id')
self.log.debug("FCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.fcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
if reply.get('failure'):
self.metrics.increment("updates.client.bridge.fcm.failed",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
reason = result.get('error', "Unreported")
err = self.reasonTable.get(reason)
if err.get("crit", False):
self.log.critical(
err['msg'],
nlen=len(notification.data),
nlen=notification.data_length,
regid=router_data["token"],
senderid=self.senderID,
ttl=notification.ttl,
Expand All @@ -263,8 +266,12 @@ def _process_reply(self, reply, notification, router_data, ttl):
response_body=err['msg'],
router_data={},
)
self.metrics.increment("updates.client.bridge.fcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
41 changes: 25 additions & 16 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.gcm[sid] = gcmclient.GCM(auth)
except:
raise IOError("GCM Bridge not initiated in main")
self._base_tags = []
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -82,12 +83,12 @@ def _route(self, notification, uaid_data):
# correct encryption headers are included with the data.
if notification.data:
mdata = self.config.get('max_data', 4096)
if len(notification.data) > mdata:
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(len(notification.data) - mdata),
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
Expand Down Expand Up @@ -122,15 +123,15 @@ def _route(self, notification, uaid_data):
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
self.metrics.increment("updates.client.bridge.gcm.connection_err",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.gcm.attempted",
self._base_tags)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)

Expand All @@ -148,16 +149,18 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
for old_id, new_id in reply.canonical.items():
self.log.debug("GCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.gcm.failed.rereg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
# naks:
# uninstall:
for reg_id in reply.not_registered:
self.metrics.increment("updates.client.bridge.gcm.failed.unreg",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="unregistered"))
self.log.debug("GCM no longer registered: %s" % reg_id)
return RouterResponse(
status_code=410,
Expand All @@ -167,8 +170,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# for reg_id, err_code in reply.failed.items():
if len(reply.failed.items()) > 0:
self.metrics.increment("updates.client.bridge.gcm.failed.failure",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
self.log.debug("GCM failures: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM unable to deliver", status_code=410,
Expand All @@ -178,17 +182,22 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# retries:
if reply.needs_retry():
self.metrics.increment("updates.client.bridge.gcm.failed.retry",
self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="retry"))
self.log.warn("GCM retry requested: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM failure to deliver, retry",
status_code=503,
response_body="Please try request later.",
log_exception=False)

self.metrics.increment("updates.client.bridge.gcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
Loading