Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Add FCM HTTPv1 API support
Browse files Browse the repository at this point in the history
With the new FCM migration, it makes some sense to switch to the new
FCM HTTPv1 API. In addition, this uses twisted async.

Closes #1291
  • Loading branch information
jrconlin committed Dec 13, 2018
1 parent d2a1f3f commit 4efc7f6
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 40 deletions.
6 changes: 3 additions & 3 deletions autopush/router/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from autopush.router.interface import IRouter # noqa
from autopush.router.webpush import WebPushRouter
from autopush.router.fcm import FCMRouter
from autopush.router.fcm_v1 import FCMV1Router
from autopush.router.fcm_v1 import FCMv1Router
from autopush.router.adm import ADMRouter

__all__ = ["APNSRouter", "FCMRouter", "FCMV1Router", "GCMRouter",
__all__ = ["APNSRouter", "FCMRouter", "FCMv1Router", "GCMRouter",
"WebPushRouter", "ADMRouter"]


Expand All @@ -42,5 +42,5 @@ def routers_from_config(conf, db, agent):
if router_conf['fcm']['version'] == 0:
routers["fcm"] = FCMRouter(conf, router_conf["fcm"], db.metrics)
if router_conf['fcm']['version'] == 1:
routers["fcm"] = FCMV1Router(conf, router_conf["fcm"], db.metrics)
routers["fcm"] = FCMv1Router(conf, router_conf["fcm"], db.metrics)
return routers
34 changes: 16 additions & 18 deletions autopush/router/fcm_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from autopush.types import JSONDict # noqa


class FCMV1Router(FCMRouter):
class FCMv1Router(FCMRouter):
"""FCM v1 HTTP Router Implementation
Note: FCM v1 is a newer version of the FCM HTTP API.
Expand All @@ -29,11 +29,11 @@ def __init__(self, conf, router_conf, metrics):
self.senderID = router_conf.get("senderID")
self.version = router_conf["version"]
self.log = Logger()
self.fcm = FCMv1(router_conf["service_cred_path"],
router_conf["senderID"],
self.log,
self.metrics)
self._base_tags = ["platform:fcm"]
self.fcm = FCMv1(project_id=self.senderID,
service_cred_path=router_conf['service_cred_path'],
logger=self.log,
metrics=self.metrics)
self._base_tags = ["platform:fcmv1"]
self.log.debug("Starting FCMv1 router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -102,7 +102,7 @@ def _route(self, notification, router_data):
"collapse_key": self.collapseKey,
"data_message": data,
"dry_run": self.dryRun or ('dryrun' in router_data),
"time_to_live": router_ttl
"ttl": router_ttl
})
d.addCallback(
self._process_reply, notification, router_data, router_ttl
Expand All @@ -118,7 +118,7 @@ def _process_error(self, failure):
self.log.error("FCM Authentication Error: {}".format(err))
raise RouterException("Server error", status_code=500, errno=901)
if isinstance(err, TimeoutError):
self.log.warn("GCM Timeout: %s" % err)
self.log.warn("FCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
Expand All @@ -127,14 +127,20 @@ def _process_error(self, failure):
errno=903,
log_exception=False)
if isinstance(err, ConnectError):
self.log.warn("GCM Unavailable: %s" % err)
self.log.warn("FCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=902,
log_exception=False)
if isinstance(err, RouterException):
self.log.warn("FCM Error: {}".format(err))
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="server_error"))
return failure

def _error(self, err, status, **kwargs):
Expand All @@ -148,15 +154,7 @@ def _process_reply(self, reply, notification, router_data, ttl):
# acks:
# for reg_id, msg_id in reply.success.items():
# updates
if reply.failure:
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
raise RouterException("FCM failure to deliver",
status_code=reply.code,
response_body="Please try request "
"later.",
log_exception=False)
# Failures are returned as non-200 messages (404, 410, etc.)
self.metrics.increment("notification.bridge.sent",
tags=self._base_tags)
self.metrics.increment("notification.message_data",
Expand Down
71 changes: 53 additions & 18 deletions autopush/router/fcmv1client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import treq
from oauth2client.service_account import ServiceAccountCredentials
from twisted.logger import Logger
from twisted.internet.error import ConnectError
from twisted.internet.error import (ConnectError, TimeoutError)

from autopush.exceptions import RouterException

Expand All @@ -14,30 +14,62 @@ class FCMAuthenticationError(Exception):

class Result(object):

def __init__(self):
self.code = 0
def __init__(self, response):
self.code = response.code
self.success = 0
self.failure = []
self.retry_message = None
self.retry_after = (
response.headers.getRawHeaders('Retry-After') or [None])[0]

def parse_response(self, content, code):
def parse_response(self, content):
# 400 will return an error message indicating what's wrong with the
# javascript message you sent.
# 403 is an error indicating that the client app is missing the
# FCM Cloud Messaging permission (and a URL to set it)
if code in (400, 403, 404):
raise RouterException(content)
data = json.loads(content)
self.code = code
self.success = data.get('success', 0)
if data.get('failure'):
self.failure = data.get('failed_registration_ids')
# Successful content body
# { "name": "projects/.../messages/0:..."}
# Failures:
# { "error":
# { "status": str
# "message": str
# "code": u64
# "details: [
# {"errorCode": str,
# "@type": str},
# {"fieldViolations": [
# {"field": str,
# "description": str}
# ],
# "type", str
# }
# ]
# }
# }
# (Failures are a tad more verbose)
if 500 <= self.code <= 599:
self.retry_message = content
return self
try:
data = json.loads(content)
if self.code in (400, 403, 404) or data.get('error'):
# Having a hard time finding information about how some
# things are handled in FCM, e.g. retransmit requests.
# For now, catalog them as errors and provide back-pressure.
err = data.get("error")
raise RouterException("{}: {}".format(err.get("status"),
err.get("message")))
if "name" in data:
self.success = 1
except (TypeError, ValueError, KeyError, AttributeError):
raise RouterException(
"Unknown error response: {}".format(content))
return self


class FCMv1(object):
def __init__(self,
service_cred_path,
project_id,
service_cred_path=None,
logger=None,
metrics=None,
**options):
Expand All @@ -49,9 +81,10 @@ def __init__(self,
self.metrics = metrics
self.logger = logger or Logger()
self._options = options
self.svc_cred = ServiceAccountCredentials.from_json_keyfile_name(
service_cred_path,
["https://www.googleapis.com/auth/firebase.messaging"])
if service_cred_path:
self.svc_cred = ServiceAccountCredentials.from_json_keyfile_name(
service_cred_path,
["https://www.googleapis.com/auth/firebase.messaging"])
self._sender = treq.post

def _get_access_token(self):
Expand All @@ -74,13 +107,15 @@ def process(self, response, payload=None):
if response.code == 401:
raise FCMAuthenticationError("Authentication Error")

result = Result()
result = Result(response)

d = response.text()
d.addCallback(result.parse_response, response.code)
d.addCallback(result.parse_response)
return d

def error(self, failure):
if isinstance(failure.value, FCMAuthenticationError) or \
isinstance(failure.value, TimeoutError) or \
isinstance(failure.value, ConnectError):
raise failure.value
self.logger.error("FCMv1Client failure: {}".format(failure.value))
Expand Down
103 changes: 103 additions & 0 deletions autopush/tests/test_fcmclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import json

import pytest
import treq
from mock import Mock

from oauth2client.service_account import ServiceAccountCredentials
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.trial import unittest
from twisted.web.http_headers import Headers

from autopush.exceptions import RouterException
from autopush.router.fcmv1client import FCMv1, FCMAuthenticationError


class FCMv1TestCase(unittest.TestCase):

def setUp(self):
self._m_request = Deferred()
self._m_response = Mock(spec=treq.response._Response)
self._m_response.code = 200
self._m_response.headers = Headers()
self._m_resp_text = Deferred()
self._m_response.text.return_value = self._m_resp_text
self.fcm = FCMv1(project_id="fcm_test")
self.fcm._sender = Mock(spec=treq.request)
self.fcm.svc_cred = Mock(spec=ServiceAccountCredentials)
atoken = Mock()
atoken.access_token = "access_token"
self.fcm.svc_cred.get_access_token.return_value = atoken
self.fcm._sender.return_value = self._m_request
self.m_payload = {"ttl": 60, "data_message": "some content"}
self._success = {
u"name": (u'projects/fir-bridgetest/messages/'
u'0:1544652984769917%0aa51ebcf9fd7ecd')
}
self._failure = {
u'error': {
u'status': u'INVALID_ARGUMENT',
u'message': (u'The registration token is not a valid '
u'FCM registration token'),
u'code': 400,
u'details': [
{
u'errorCode': u'INVALID_ARGUMENT',
u'@type': (u'type.googleapis.com/google.firebase'
u'.fcm.v1.FcmError')},
{u'fieldViolations': [
{u'field': u'message.token',
u'description': (u'The registration token is not '
u'a valid FCM registration token')}],
u'@type': u'type.googleapis.com/google.rpc.BadRequest'}
]
}
}

@inlineCallbacks
def test_send(self):
content = json.dumps(self._success)
self._m_resp_text.callback(content)
self._m_request.callback(self._m_response)
result = yield self.fcm.send("token", self.m_payload)
assert result.success == 1

@inlineCallbacks
def test_bad_reply(self):
self._m_response.code = 400
content = json.dumps("Invalid JSON")
self._m_resp_text.callback(content)
self._m_request.callback(self._m_response)
with pytest.raises(RouterException) as ex:
yield self.fcm.send("token", self.m_payload)
assert ex.value.status_code == 500

@inlineCallbacks
def test_fail_400(self):
self._m_response.code = 400
content = json.dumps(self._failure)
self._m_resp_text.callback(content)
self._m_request.callback(self._m_response)
with pytest.raises(RouterException) as ex:
yield self.fcm.send("token", self.m_payload)
assert ex.value.status_code == 500
assert "Server error: INVALID_ARGUMENT:" in ex.value.message

@inlineCallbacks
def test_fail_401(self):
self._m_response.code = 401
content = "Unauthorized"
self._m_resp_text.callback(content)
self._m_request.callback(self._m_response)
with pytest.raises(FCMAuthenticationError):
yield self.fcm.send("token", self.m_payload)

@inlineCallbacks
def test_fail_500(self):
self._m_response.code = 500
content = "OMG"
self._m_response.headers.addRawHeader('Retry-After', 123)
self._m_resp_text.callback(content)
self._m_request.callback(self._m_response)
result = yield self.fcm.send("token", self.m_payload)
assert result.retry_after == 123
Loading

0 comments on commit 4efc7f6

Please sign in to comment.