Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: Use FCM HTTPv1 protocol with twisted async #1308

Merged
merged 4 commits into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,27 @@ def from_argparse(cls, ns, **kwargs):
client_certs[sig] = name

if ns.fcm_enabled:
# Create a common gcmclient
if not ns.fcm_auth:
raise InvalidConfig("No Authorization Key found for FCM")
if not ns.fcm_senderid:
raise InvalidConfig("No SenderID found for FCM")
router_conf["fcm"] = {"ttl": ns.fcm_ttl,
"dryrun": ns.fcm_dryrun,
"max_data": ns.max_data,
"collapsekey": ns.fcm_collapsekey,
"auth": ns.fcm_auth,
"senderid": ns.fcm_senderid}
fcm_core = {
"ttl": ns.fcm_ttl,
"dryrun": ns.fcm_dryrun,
"max_data": ns.max_data,
"collapsekey": ns.fcm_collapsekey}
if len(ns.fcm_auth) > 0:
if not ns.fcm_senderid:
raise InvalidConfig("No SenderID found for FCM")
fcm_core.update({
"version": 0,
"auth": ns.fcm_auth,
"senderID": ns.fcm_senderid})
if len(ns.fcm_service_cred_path) > 0:
fcm_core.update({
"ttl": ns.fcm_ttl,
"version": 1,
"service_cred_path": ns.fcm_service_cred_path,
"senderID": ns.fcm_project_id})
if "version" not in fcm_core:
raise InvalidConfig("No credential info found for FCM")
router_conf["fcm"] = fcm_core

if ns.adm_creds:
# Create a common admclient
Expand Down
1 change: 1 addition & 0 deletions autopush/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

# Number of concurrent threads / AWS Session resources
THREAD_POOL_SIZE = 50
DEFAULT_ROUTER_TIMEOUT = 3
8 changes: 8 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ def _add_external_router_args(parser):
type=str, default="", env_var="FCM_AUTH")
parser.add_argument('--fcm_senderid', help='SenderID for FCM',
type=str, default="", env_var="FCM_SENDERID")
# FCM v1 HTTP API
parser.add_argument('--fcm_project_id', help="FCM Project identifier",
type=str, default="", env_var="FCM_PROJECT_ID")
parser.add_argument('--fcm_service_cred_path',
help="Path to FCM Service Credentials",
type=str, default="",
env_var="FCM_SERVICE_CRED_PATH")

# Apple Push Notification system (APNs) for iOS
# credentials consist of JSON struct containing a channel type
# followed by the settings,
Expand Down
13 changes: 11 additions & 2 deletions autopush/router/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
from autopush.router.interface import IRouter # noqa
from autopush.router.webpush import WebPushRouter
from autopush.router.fcm import FCMRouter
from autopush.router.fcm_v1 import FCMv1Router
from autopush.router.adm import ADMRouter

__all__ = ["APNSRouter", "FCMRouter", "GCMRouter", "WebPushRouter",
"ADMRouter"]
__all__ = ["APNSRouter", "FCMRouter", "FCMv1Router", "GCMRouter",
"WebPushRouter", "ADMRouter"]


def routers_from_config(conf, db, agent):
Expand All @@ -34,4 +35,12 @@ def routers_from_config(conf, db, agent):
routers["gcm"] = GCMRouter(conf, router_conf["gcm"], db.metrics)
if 'adm' in router_conf:
routers["adm"] = ADMRouter(conf, router_conf["adm"], db.metrics)
if 'fcm' in router_conf:
# There are two forms of FCM that can be used. "Legacy" and "v1".
# While possible, they really should not be used in parallel, and only
# one form should be defined.
if router_conf['fcm']['version'] == 0:
routers["fcm"] = FCMRouter(conf, router_conf["fcm"], db.metrics)
if router_conf['fcm']['version'] == 1:
routers["fcm"] = FCMv1Router(conf, router_conf["fcm"], db.metrics)
return routers
5 changes: 3 additions & 2 deletions autopush/router/adm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from twisted.internet.threads import deferToThread
from twisted.logger import Logger

from autopush.constants import DEFAULT_ROUTER_TIMEOUT
from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
Expand All @@ -24,7 +25,7 @@ def __init__(self,
logger=None,
metrics=None,
endpoint="api.amazon.com",
timeout=2,
timeout=DEFAULT_ROUTER_TIMEOUT,
**options
):

Expand Down Expand Up @@ -106,7 +107,7 @@ def __init__(self, conf, router_conf, metrics):
self.router_conf = router_conf
self.metrics = metrics
self.min_ttl = router_conf.get("ttl", 60)
timeout = router_conf.get("timeout", 10)
timeout = router_conf.get("timeout", DEFAULT_ROUTER_TIMEOUT)
self.profiles = dict()
for profile in router_conf:
config = router_conf[profile]
Expand Down
2 changes: 1 addition & 1 deletion autopush/router/fcm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""FCM Router"""
"""FCM legacy HTTP Router"""
from typing import Any # noqa

import pyfcm
Expand Down
165 changes: 165 additions & 0 deletions autopush/router/fcm_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""FCM v1 HTTP Router"""
from typing import Any # noqa

from twisted.internet.error import ConnectError, TimeoutError
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.router.fcm import FCMRouter
from autopush.router.fcmv1client import (FCMv1, FCMAuthenticationError)
from autopush.types import JSONDict # noqa


class FCMv1Router(FCMRouter):
"""FCM v1 HTTP Router Implementation

Note: FCM v1 is a newer version of the FCM HTTP API.
"""

def __init__(self, conf, router_conf, metrics):
"""Create a new FCM router and connect to FCM"""
self.conf = conf
self.router_conf = router_conf
self.metrics = metrics
self.min_ttl = router_conf.get("ttl", 60)
self.dryRun = router_conf.get("dryrun", False)
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.version = router_conf["version"]
self.log = Logger()
self.fcm = FCMv1(project_id=self.senderID,
service_cred_path=router_conf['service_cred_path'],
logger=self.log,
metrics=self.metrics)
self._base_tags = ["platform:fcmv1"]
self.log.debug("Starting FCMv1 router...")

def amend_endpoint_response(self, response, router_data):
# type: (JSONDict, JSONDict) -> None
response["senderid"] = self.senderID

def register(self, uaid, router_data, app_id, *args, **kwargs):
# type: (str, JSONDict, str, *Any, **Any) -> None
"""Validate that the FCM Instance Token is in the ``router_data``"""
senderid = app_id
# "token" is the FCM token generated by the client.
if "token" not in router_data:
raise self._error("connect info missing FCM Instance 'token'",
status=401,
uri=kwargs.get('uri'),
senderid=repr(senderid))
if senderid != self.senderID:
raise self._error("Invalid SenderID", status=410, errno=105)
# Assign a senderid
router_data["creds"] = {"senderID": self.senderID}

def route_notification(self, notification, uaid_data):
"""Start the FCM notification routing, returns a deferred"""
router_data = uaid_data["router_data"]
# Kick the entire notification routing off to a thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad comment, thanks!

return self._route(notification, router_data)

def _route(self, notification, router_data):
"""Blocking FCM call to route the notification"""
# THIS MUST MATCH THE CHANNELID GENERATED BY THE REGISTRATION SERVICE
# Currently this value is in hex form.
data = {"chid": notification.channel_id.hex}
if not router_data.get("token"):
raise self._error("No registration token found. "
"Rejecting message.",
410, errno=106, log_exception=False)
# Payload data is optional. The endpoint handler validates that the
# correct encryption headers are included with the data.
if notification.data:
mdata = self.router_conf.get('max_data', 4096)
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
data['con'] = notification.headers['encoding']

if 'encryption' in notification.headers:
data['enc'] = notification.headers['encryption']
if 'crypto_key' in notification.headers:
data['cryptokey'] = notification.headers['crypto_key']
elif 'encryption_key' in notification.headers:
data['enckey'] = notification.headers['encryption_key']

# registration_ids are the FCM instance tokens (specified during
# registration.
router_ttl = min(self.MAX_TTL,
max(self.min_ttl, notification.ttl or 0))
d = self.fcm.send(
token=router_data.get("token"),
payload={
"collapse_key": self.collapseKey,
"data_message": data,
"dry_run": self.dryRun or ('dryrun' in router_data),
"ttl": router_ttl
})
d.addCallback(
self._process_reply, notification, router_data, router_ttl
)
d.addErrback(
self._process_error
)
return d

def _process_error(self, failure):
err = failure.value
if isinstance(err, FCMAuthenticationError):
self.log.error("FCM Authentication Error: {}".format(err))
raise RouterException("Server error", status_code=500, errno=901)
if isinstance(err, TimeoutError):
self.log.warn("FCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="timeout"))
raise RouterException("Server error", status_code=502,
errno=903,
log_exception=False)
if isinstance(err, ConnectError):
self.log.warn("FCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=902,
log_exception=False)
if isinstance(err, RouterException):
self.log.warn("FCM Error: {}".format(err))
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="server_error"))
return failure

def _error(self, err, status, **kwargs):
"""Error handler that raises the RouterException"""
self.log.debug(err, **kwargs)
return RouterException(err, status_code=status, response_body=err,
**kwargs)

def _process_reply(self, reply, notification, router_data, ttl):
"""Process FCM send reply"""
# Failures are returned as non-200 messages (404, 410, etc.)
self.metrics.increment("notification.bridge.sent",
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
return RouterResponse(status_code=201, response_body="",
headers={"TTL": ttl,
"Location": location},
logged_status=200)
Loading