Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: make gcm calls use async callbacks #1296

Merged
merged 1 commit into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""GCM Router"""
from typing import Any # noqa

from requests.exceptions import ConnectionError, Timeout
from twisted.internet.threads import deferToThread
from twisted.logger import Logger
from twisted.internet.error import ConnectError, TimeoutError

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
Expand All @@ -28,15 +27,16 @@ def __init__(self, conf, router_conf, metrics):
self.dryRun = router_conf.get("dryrun", False)
self.collapseKey = router_conf.get("collapseKey")
timeout = router_conf.get("timeout", 10)
self.gcm = {}
self.gcmclients = {}
self.senderIDs = {}
# Flatten the SenderID list from human readable and init gcmclient
if not router_conf.get("senderIDs"):
raise IOError("SenderIDs not configured.")
for sid in router_conf.get("senderIDs"):
auth = router_conf.get("senderIDs").get(sid).get("auth")
self.senderIDs[sid] = auth
self.gcm[sid] = gcmclient.GCM(auth, timeout=timeout)
self.gcmclients[sid] = gcmclient.GCM(auth, timeout=timeout,
logger=self.log)
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

Expand Down Expand Up @@ -69,7 +69,7 @@ def register(self, uaid, router_data, app_id, *args, **kwargs):
def route_notification(self, notification, uaid_data):
"""Start the GCM notification routing, returns a deferred"""
# Kick the entire notification routing off to a thread
return deferToThread(self._route, notification, uaid_data)
return self._route(notification, uaid_data)

def _route(self, notification, uaid_data):
"""Blocking GCM call to route the notification"""
Expand Down Expand Up @@ -111,41 +111,48 @@ def _route(self, notification, uaid_data):
data=data,
)
try:
gcm = self.gcm[router_data['creds']['senderID']]
result = gcm.send(payload)
except RouterException:
raise # pragma nocover
client = self.gcmclients[router_data['creds']['senderID']]
d = client.send(payload)
d.addCallback(
self._process_reply,
uaid_data,
router_ttl,
notification)

d.addErrback(
self._process_error
)
return d
except KeyError:
self.log.critical("Missing GCM bridge credentials")
raise RouterException("Server error", status_code=500,
errno=900)
except gcmclient.GCMAuthenticationError as e:
self.log.error("GCM Authentication Error: %s" % e)

def _process_error(self, failure):
err = failure.value
if isinstance(err, gcmclient.GCMAuthenticationError):
self.log.error("GCM Authentication Error: %s" % err)
raise RouterException("Server error", status_code=500,
errno=901)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
if isinstance(err, TimeoutError):
self.log.warn("GCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
reason="timeout"))
raise RouterException("Server error", status_code=502,
errno=902,
errno=903,
log_exception=False)
except Timeout as e:
self.log.warn("GCM Timeout: %s" % e)
if isinstance(err, ConnectError):
self.log.warn("GCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="timeout"))
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=903,
errno=902,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)
return failure

def _error(self, err, status, **kwargs):
"""Error handler that raises the RouterException"""
Expand Down
73 changes: 48 additions & 25 deletions autopush/router/gcmclient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json

import requests
import treq
from twisted.web.http_headers import Headers
from twisted.logger import Logger
from twisted.internet.error import ConnectError

from autopush.exceptions import RouterException

Expand All @@ -12,7 +15,7 @@ class GCMAuthenticationError(Exception):
class Result(object):
"""Abstraction object for GCM response"""

def __init__(self, message, response):
def __init__(self, response, message):
"""Process GCM message and response into abstracted object

:param message: Message payload
Expand All @@ -30,14 +33,13 @@ def __init__(self, message, response):
self.message = message
self.retry_message = None

self.retry_after = response.headers.get('Retry-After', None)
self.retry_after = (
response.headers.getRawHeaders('Retry-After') or [None])[0]

if response.status_code != 200:
self.retry_message = message
else:
self._parse_response(message, response.content)

def _parse_response(self, message, content):
def parse_response(self, content, code, message):
# 401 handled in GCM.process()
if code in (400, 404):
raise RouterException(content)
data = json.loads(content)
if not data.get('results'):
raise RouterException("Recv'd invalid response from GCM")
Expand All @@ -54,6 +56,7 @@ def _parse_response(self, message, content):
self.not_registered.append(reg_id)
else:
self.failed[reg_id] = res['error']
return self


class JSONMessage(object):
Expand Down Expand Up @@ -124,9 +127,31 @@ def __init__(self,
self._endpoint = "https://{}".format(endpoint)
self._api_key = api_key
self.metrics = metrics
self.log = logger
self.log = logger or Logger()
self._options = options
self._sender = requests.post
self._sender = treq.post

def process(self, response, payload):
if response.code == 401:
raise GCMAuthenticationError("Authentication Error")

result = Result(response, payload)

if 500 <= response.code <= 599:
result.retry_message = payload
return result

# Fetch the content body
d = response.text()
d.addCallback(result.parse_response, response.code, payload)
return d

def error(self, failure):
if isinstance(failure.value, GCMAuthenticationError) or \
isinstance(failure.value, ConnectError):
raise failure.value
self.log.error("GCMClient failure: {}".format(failure.value))
raise RouterException("Server error: {}".format(failure.value))

def send(self, payload):
"""Send a payload to GCM
Expand All @@ -136,23 +161,21 @@ def send(self, payload):
:return: Result

"""
headers = {
'Content-Type': 'application/json',
'Authorization': 'key={}'.format(self._api_key),
}
headers = Headers({
'Content-Type': ['application/json'],
'Authorization': ['key={}'.format(self._api_key)],
})

if 'timeout' not in self._options:
self._options['timeout'] = 3

response = self._sender(
d = self._sender(
url=self._endpoint,
headers=headers,
data=json.dumps(payload.payload),
**self._options
)

if response.status_code in (400, 404):
raise RouterException(response.content)

if response.status_code == 401:
raise GCMAuthenticationError("Authentication Error")

if response.status_code == 200 or (500 <= response.status_code <= 599):
return Result(payload, response)
# handle the immediate response (which contains no body)
d.addCallback(self.process, payload)
d.addErrback(self.error)
return d
Loading