Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
WIP: make gcm calls use async callbacks
Browse files Browse the repository at this point in the history
Closes #1291
  • Loading branch information
jrconlin committed Sep 28, 2018
1 parent a569714 commit c924423
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 56 deletions.
49 changes: 31 additions & 18 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self, conf, router_conf, metrics):
for sid in router_conf.get("senderIDs"):
auth = router_conf.get("senderIDs").get(sid).get("auth")
self.senderIDs[sid] = auth
self.gcm[sid] = gcmclient.GCM(auth, timeout=timeout)
self.gcm[sid] = gcmclient.GCM(auth, timeout=timeout,
logger=self.log)
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

Expand Down Expand Up @@ -64,12 +65,12 @@ def register(self, uaid, router_data, app_id, *args, **kwargs):
senderid=senderid)
# Assign a senderid
router_data["creds"] = {"senderID": senderid,
"auth": self.senderIDs[senderid]}
"auth": self.senderIDs[senderid]}

def route_notification(self, notification, uaid_data):
"""Start the GCM notification routing, returns a deferred"""
# Kick the entire notification routing off to a thread
return deferToThread(self._route, notification, uaid_data)
return self._route(notification, uaid_data)

def _route(self, notification, uaid_data):
"""Blocking GCM call to route the notification"""
Expand Down Expand Up @@ -112,52 +113,64 @@ def _route(self, notification, uaid_data):
)
try:
gcm = self.gcm[router_data['creds']['senderID']]
result = gcm.send(payload)
except RouterException:
raise # pragma nocover
d = gcm.send(payload)
d.addCallback(
self._process_reply,
uaid_data,
router_ttl,
notification)

d.addErrback(
self._process_error
)
return d
except KeyError:
self.log.critical("Missing GCM bridge credentials")
raise RouterException("Server error", status_code=500,
errno=900)
except gcmclient.GCMAuthenticationError as e:
self.log.error("GCM Authentication Error: %s" % e)

def _process_error(self, failure):
err = failure.value
if isinstance(err, RouterException):
return failure
if isinstance(err, gcmclient.GCMAuthenticationError):
self.log.error("GCM Authentication Error: %s" % err)
raise RouterException("Server error", status_code=500,
errno=901)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
if isinstance(err, ConnectionError):
self.log.warn("GCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=902,
log_exception=False)
except Timeout as e:
self.log.warn("GCM Timeout: %s" % e)
if isinstance(err, Timeout):
self.log.warn("GCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="timeout"))
raise RouterException("Server error", status_code=502,
errno=903,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)

self.log.error("Unhandled exception in GCM Routing: %s" % err)
raise RouterException("Server error", status_code=500)

def _error(self, err, status, **kwargs):
"""Error handler that raises the RouterException"""
self.log.debug(err, **kwargs)
return RouterException(err, status_code=status, response_body=err,
**kwargs)

def _process_reply(self, reply, uaid_data, ttl, notification):
def _process_reply(self, gcm, uaid_data, ttl, notification):
"""Process GCM send reply"""
# acks:
# for reg_id, msg_id in reply.success.items():
# updates
reply = gcm.response
for old_id, new_id in reply.canonicals.items():
self.log.debug("GCM id changed : {old} => {new}",
old=old_id, new=new_id)
Expand Down
70 changes: 47 additions & 23 deletions autopush/router/gcmclient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json

import requests
import treq
from twisted.web.http_headers import Headers

from autopush.exceptions import RouterException

Expand All @@ -12,7 +14,7 @@ class GCMAuthenticationError(Exception):
class Result(object):
"""Abstraction object for GCM response"""

def __init__(self, message, response):
def __init__(self, response, message):
"""Process GCM message and response into abstracted object
:param message: Message payload
Expand All @@ -30,14 +32,10 @@ def __init__(self, message, response):
self.message = message
self.retry_message = None

self.retry_after = response.headers.get('Retry-After', None)
self.retry_after = (
response.headers.getRawHeaders('Retry-After') or [None])[0]

if response.status_code != 200:
self.retry_message = message
else:
self._parse_response(message, response.content)

def _parse_response(self, message, content):
def parse_response(self, content, message):
data = json.loads(content)
if not data.get('results'):
raise RouterException("Recv'd invalid response from GCM")
Expand All @@ -54,6 +52,7 @@ def _parse_response(self, message, content):
self.not_registered.append(reg_id)
else:
self.failed[reg_id] = res['error']
return self


class JSONMessage(object):
Expand Down Expand Up @@ -126,7 +125,33 @@ def __init__(self,
self.metrics = metrics
self.log = logger
self._options = options
self._sender = requests.post
self._sender = treq.request

def _set_response(self, response):
self.response = response
return self

def process(self, response, payload):
if response.code in (400, 404):
raise RouterException(response.content)

if response.code == 401:
raise GCMAuthenticationError("Authentication Error")

result = Result(response, payload)

if 500 <= response.code <= 599:
result.retry_message = payload
if response.code == 200 or ():
# Fetch the content body
d = response.text()
d.addCallback(result.parse_response, payload)
d.addCallback(self._set_response)
return d

def error(self, failure):
self.log.error("GCMClient failure: {}".format(failure.value))
raise RouterException(failure.value)

def send(self, payload):
"""Send a payload to GCM
Expand All @@ -136,23 +161,22 @@ def send(self, payload):
:return: Result
"""
headers = {
'Content-Type': 'application/json',
'Authorization': 'key={}'.format(self._api_key),
}
headers = Headers({
'Content-Type': ['application/json'],
'Authorization': ['key={}'.format(self._api_key)],
})

response = self._sender(
if 'timeout' not in self._options:
self._options['timeout'] = 3

d = self._sender(
method="POST",
url=self._endpoint,
headers=headers,
data=json.dumps(payload.payload),
**self._options
)

if response.status_code in (400, 404):
raise RouterException(response.content)

if response.status_code == 401:
raise GCMAuthenticationError("Authentication Error")

if response.status_code == 200 or (500 <= response.status_code <= 599):
return Result(payload, response)
# handle the immediate response (which contains no body)
d.addCallback(self.process, payload)
d.addErrback(self.error)
return d
3 changes: 3 additions & 0 deletions autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ def _router_completed(self, response, uaid_data, warning="",
router_type=None, vapid=None):
"""Called after router has completed successfully"""
# Log the time taken for routing
print("router_complete response: ", response)
if response is None:
return
self._timings["route_time"] = time.time() - self._router_time
# Were we told to update the router data?
time_diff = time.time() - self._start_time
Expand Down
31 changes: 16 additions & 15 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
-e git+https://github.com/habnabit/txstatsd.git@157ef85fbdeafe23865c7c4e176237ffcb3c3f1f#egg=txStatsD-master
apns==2.0.1
asn1crypto==0.24.0 # via cryptography
asn1crypto==0.24.0 # via cryptography, treq
attrs==18.1.0
autobahn[twisted]==18.6.1
automat==0.7.0 # via twisted
automat==0.7.0 # via twisted, treq
boto3==1.7.57
botocore==1.10.57 # via boto3, s3transfer
certifi==2018.4.16 # via requests
certifi==2018.4.16 # via requests, treq
cffi==1.11.5
chardet==3.0.4 # via requests
chardet==3.0.4 # via requests, treq
click==6.7
configargparse==0.13.0
constantly==15.1.0 # via twisted
constantly==15.1.0 # via twisted, treq
contextlib2==0.5.5 # via raven
cryptography==2.3
cyclone==1.1
datadog==0.22.0
decorator==4.3.0 # via datadog
docutils==0.14 # via botocore
ecdsa==0.13 # via python-jose
enum34==1.1.6 # via cryptography, h2
enum34==1.1.6 # via cryptography, h2, treq
future==0.16.0 # via python-jose
futures==3.2.0 # via s3transfer
gcm-client==0.1.4
Expand All @@ -28,19 +28,19 @@ h2==2.6.2 # via hyper
hpack==3.0.0 # via h2
hyper==0.7.0
hyperframe==3.2.0 # via h2, hyper
hyperlink==18.0.0 # via twisted
idna==2.7 # via cryptography, hyperlink, requests
incremental==17.5.0 # via twisted
ipaddress==1.0.22 # via cryptography
hyperlink==18.0.0 # via twisted, treq
idna==2.7 # via cryptography, hyperlink, requests, treq
incremental==17.5.0 # via twisted, treq
ipaddress==1.0.22 # via cryptography, treq
jmespath==0.9.3 # via boto3, botocore
marshmallow-polyfield==3.2
marshmallow==2.15.3
objgraph==3.4.0
pyasn1-modules==0.2.2 # via service-identity
pyasn1-modules==0.2.2 # via service-identity, treq
pyasn1==0.4.3
pycparser==2.18 # via cffi
pycparser==2.18 # via cffi, treq
pyfcm==1.4.5
pyhamcrest==1.9.0 # via twisted
pyhamcrest==1.9.0 # via twisted, treq
pyopenssl==18.0.0
python-dateutil==2.7.3 # via botocore
python-jose==3.0.0
Expand All @@ -51,11 +51,12 @@ rsa==3.4.2 # via python-jose
s3transfer==0.1.13 # via boto3
service-identity==17.0.0
simplejson==3.16.0
six==1.11.0 # via autobahn, automat, cryptography, pyhamcrest, pyopenssl, python-dateutil, python-jose, txaio
six==1.11.0 # via autobahn, automat, cryptography, pyhamcrest, pyopenssl, python-dateutil, python-jose, txaio, treq
treq==18.6.0
twisted==18.7.0
txaio==2.10.0 # via autobahn
typing==3.6.4
ua-parser==0.8.0
urllib3==1.23 # via requests
urllib3==1.23 # via requests, treq
wsaccel==0.6.2 ; platform_python_implementation == "CPython"
zope.interface==4.5.0

0 comments on commit c924423

Please sign in to comment.