Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: enforce strict crypto header checks
Browse files Browse the repository at this point in the history
Explicitly verify the crypto headers are present and match either the
01 or 04 webpush encryption drafts. This also includes a refactor of
the push schemas to remove the push_validation file.

Closes #188
  • Loading branch information
bbangert committed Nov 15, 2016
1 parent b2431b4 commit c9ada09
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 331 deletions.
4 changes: 2 additions & 2 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def send_notification(self, channel=None, version=None, data=None,
if use_header:
headers.update({
"Content-Type": "application/octet-stream",
"Content-Encoding": "aesgcm-128",
"Content-Encoding": "aesgcm",
"Encryption": self._crypto_key,
"Crypto-Key": 'keyid="a1"; dh="JcqK-OLkJZlJ3sJJWstJCA"',
})
Expand Down Expand Up @@ -1125,7 +1125,7 @@ def test_message_without_crypto_headers(self):
data = str(uuid.uuid4())
client = yield self.quick_register(use_webpush=True)
result = yield client.send_notification(data=data, use_header=False,
status=400)
status=401)
eq_(result, None)
yield self.shut_down(client)

Expand Down
181 changes: 153 additions & 28 deletions autopush/tests/test_web_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def check_result(result):

class TestSimplePushRequestSchema(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import SimplePushRequestSchema
from autopush.web.simplepush import SimplePushRequestSchema
schema = SimplePushRequestSchema()
schema.context["settings"] = Mock()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -283,7 +283,7 @@ def test_invalid_data_size(self):

class TestWebPushRequestSchema(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import WebPushRequestSchema
from autopush.web.webpush import WebPushRequestSchema
schema = WebPushRequestSchema()
schema.context["settings"] = Mock()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -324,10 +324,12 @@ def test_no_headers(self):
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
data = self._make_test_data(body="asdfasdf",
headers={"ttl": "invalid"})
result, errors = schema.load(data)
eq_(errors, {'headers': {'ttl': [u'Not a valid integer.']}})
data = self._make_test_data(body="asdfasdf")

with assert_raises(InvalidRequest) as cm:
schema.load(data)

eq_(cm.exception.message, "Unknown Content-Encoding")

def test_invalid_simplepush_user(self):
schema = self._make_fut()
Expand Down Expand Up @@ -419,23 +421,110 @@ def test_invalid_header_combo(self):
info = self._make_test_data(
headers={
"content-encoding": "aesgcm128",
"crypto-key": "asdfjialsjdfiasjld",
}
"crypto-key": "dh=asdfjialsjdfiasjld",
"encryption-key": "dh=asdfjasidlfjaislf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.errno, 110)

def test_missing_encryption_salt(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"encryption-key": "aesgcm128",
"crypto-key": "asdfjialsjdfiasjld",
}
"content-encoding": "aesgcm128",
"encryption": "dh=asdfjasidlfjaislf",
"encryption-key": "dh=jilajsidfljasildjf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_encryption_salt_04(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm",
"encryption": "dh=asdfjasidlfjaislf",
"crypto-key": "dh=jilajsidfljasildjf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_encryption_key_dh(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm128",
"encryption": "salt=asdfjasidlfjaislf",
"encryption-key": "keyid=jialsjdifjlasd",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_crypto_key_dh(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm",
"encryption": "salt=asdfjasidlfjaislf",
"crypto-key": "p256ecdsa=BA1Hxzyi1RUM1b5wjxsn7nGxAs",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_invalid_data_size(self):
Expand All @@ -451,7 +540,12 @@ def test_invalid_data_size(self):
schema.context["settings"].max_data = 1

with assert_raises(InvalidRequest) as cm:
schema.load(self._make_test_data(body="asdfasdfasdfasdfasd"))
schema.load(self._make_test_data(
headers={
"content-encoding": "aesgcm",
"crypto-key": "dh=asdfjialsjdfiasjld",
},
body="asdfasdfasdfasdfasd"))

eq_(cm.exception.errno, 104)

Expand Down Expand Up @@ -489,14 +583,45 @@ def test_valid_data_crypto_padding_stripped(self):
headers={
"authorization": "not vapid",
"content-encoding": "aesgcm128",
"encryption": "salt=" + padded_value
"encryption": "salt=" + padded_value,
"encryption-key": "dh=asdfasdfasdf",
}
)

result, errors = schema.load(info)
eq_(errors, {})
eq_(result["headers"]["encryption"], "salt=asdfjiasljdf")

def test_invalid_dh_value_for_01_crypto(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)

padded_value = "asdfjiasljdf==="

info = self._make_test_data(
body="asdfasdfasdfasdf",
headers={
"authorization": "not vapid",
"content-encoding": "aesgcm128",
"encryption": "salt=" + padded_value,
"crypto-key": "dh=asdfasdfasdf"
}
)

with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.message, "dh value in Crypto-Key header not valid "
"for 01 or earlier webpush-encryption")

def test_invalid_vapid_crypto_header(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
Expand Down Expand Up @@ -565,7 +690,7 @@ def test_invalid_topic(self):

class TestWebPushRequestSchemaUsingVapid(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import WebPushRequestSchema
from autopush.web.webpush import WebPushRequestSchema
from autopush.settings import AutopushSettings
schema = WebPushRequestSchema()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -617,7 +742,7 @@ def test_valid_vapid_crypto_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand Down Expand Up @@ -648,7 +773,7 @@ def test_valid_vapid_crypto_header_webpush(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -659,7 +784,7 @@ def test_valid_vapid_crypto_header_webpush(self):
eq_(errors, {})
ok_("jwt" in result)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_vapid_crypto_header(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand All @@ -682,7 +807,7 @@ def test_invalid_vapid_crypto_header(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -695,7 +820,7 @@ def test_invalid_vapid_crypto_header(self, mock_jwt):
eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_encryption_header(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand All @@ -718,8 +843,8 @@ def test_invalid_encryption_header(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"encryption": "foo=stuff",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
}
Expand All @@ -729,9 +854,9 @@ def test_invalid_encryption_header(self, mock_jwt):
schema.load(info)

eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 110)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_encryption_jwt(self, mock_jwt):
schema = self._make_fut()
# use a deeply superclassed error to make sure that it gets picked up.
Expand All @@ -754,7 +879,7 @@ def test_invalid_encryption_jwt(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -767,7 +892,7 @@ def test_invalid_encryption_jwt(self, mock_jwt):
eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_crypto_key_header_content(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand Down Expand Up @@ -823,7 +948,7 @@ def test_expired_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand Down Expand Up @@ -857,7 +982,7 @@ def test_missing_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"crypto-key": ckey
}
Expand Down Expand Up @@ -890,7 +1015,7 @@ def test_bogus_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"crypto-key": ckey,
"authorization": "bogus crap"
Expand Down
16 changes: 14 additions & 2 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,25 @@ def test_hello_webpush_uses_one_db_call(self):
self._send_message(dict(messageType="hello", use_webpush=True,
channelIDs=[]))

def check_result(msg):
d = Deferred()

def check_result(msg, duration=0):
if len(db.DB_CALLS) < 3: # pragma: nocover
if duration > 3.0: # pragma: nocover
raise Exception("db calls isn't 3 yet")
else:
reactor.callLater(0.1, check_result, msg, duration+0.1)
return

eq_(db.DB_CALLS, ['register_user', 'fetch_messages',
'fetch_timestamp_messages'])
eq_(msg["status"], 200)
db.DB_CALLS = []
db.TRACK_DB_CALLS = False
return self._check_response(check_result)
d.callback(True)
f = self._check_response(check_result)
f.addErrback(lambda x: d.callback(True))
return d

def test_hello_with_webpush(self):
self._connect()
Expand Down
Loading

0 comments on commit c9ada09

Please sign in to comment.