Skip to content

Commit

Permalink
Test signature verification
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Feb 7, 2024
1 parent ae4c493 commit 1d8835a
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/dispatch/signature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Ed25519PrivateKey,
Ed25519PublicKey,
)
from http_message_signatures.exceptions import InvalidSignature
from http_message_signatures.structures import CaseInsensitiveDict

from .key import private_key_from_pem, public_key_from_pem
Expand Down
4 changes: 2 additions & 2 deletions src/dispatch/signature/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

DEFAULT_KEY_ID = "default"

COVERED_COMPONENT_IDS = (
COVERED_COMPONENT_IDS = {
"@method",
"@path",
"@authority",
"content-type",
"content-digest",
)
}
33 changes: 27 additions & 6 deletions src/dispatch/signature/signature.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from datetime import datetime, timedelta
from typing import Sequence, cast

import http_sfv
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from http_message_signatures import HTTPMessageSigner, HTTPMessageVerifier
from http_message_signatures import HTTPMessageSigner, HTTPMessageVerifier, VerifyResult

from .config import COVERED_COMPONENT_IDS, DEFAULT_KEY_ID, LABEL, SIGNATURE_ALGORITHM
from .digest import generate_content_digest, verify_content_digest
Expand Down Expand Up @@ -37,7 +39,7 @@ def sign_request(request: Request, key: Ed25519PrivateKey, created: datetime):
signer.sign(
request,
key_id=DEFAULT_KEY_ID,
covered_component_ids=COVERED_COMPONENT_IDS,
covered_component_ids=cast(Sequence[str], COVERED_COMPONENT_IDS),
created=created,
label=LABEL,
include_alg=True,
Expand All @@ -61,15 +63,34 @@ def verify_request(request: Request, key: Ed25519PublicKey, max_age: timedelta):
key: The Ed25519 public key to use to verify the signature.
max_age: The maximum age of the signature.
"""

# Verify embedded signatures.
key_resolver = KeyResolver(public_key=key)
verifier = HTTPMessageVerifier(
signature_algorithm=SIGNATURE_ALGORITHM, key_resolver=key_resolver
)
results = verifier.verify(request, max_age=max_age)

if not results:
raise ValueError("request does not contain any signatures")
# Check that at least one signature covers the required components.
for result in results:
covered_components = extract_covered_components(result)
if covered_components.issuperset(COVERED_COMPONENT_IDS):
break
else:
raise ValueError(
f"no signatures found that covered all required components ({COVERED_COMPONENT_IDS})"
)

# Check that the Content-Digest header matches the body.
verify_content_digest(request.headers["Content-Digest"], request.body)


# TODO: check all required components are covered
def extract_covered_components(result: VerifyResult) -> set[str]:
covered_components: set[str] = set()
for key in result.covered_components.keys():
item = http_sfv.Item()
item.parse(key.encode())
assert isinstance(item.value, str)
covered_components.add(item.value)

verify_content_digest(request.headers["Content-Digest"], request.body)
return covered_components
129 changes: 129 additions & 0 deletions tests/dispatch/signature/test_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import unittest
from datetime import datetime, timedelta

from http_message_signatures import HTTPMessageSigner

from dispatch.signature import (
CaseInsensitiveDict,
InvalidSignature,
Request,
sign_request,
verify_request,
)
from dispatch.signature.config import (
COVERED_COMPONENT_IDS,
DEFAULT_KEY_ID,
LABEL,
SIGNATURE_ALGORITHM,
)
from dispatch.signature.key import (
KeyResolver,
private_key_from_pem,
public_key_from_pem,
)

public_key = public_key_from_pem(
"""
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAJrQLj5P/89iXES9+vFgrIy29clF9CC/oPPsw3c5D0bs=
-----END PUBLIC KEY-----
"""
)

private_key = private_key_from_pem(
"""
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEIJ+DYvh6SEqVTm50DFtMDoQikTmiCqirVv9mWG9qfSnF
-----END PRIVATE KEY-----
"""
)


class TestSignature(unittest.TestCase):
def setUp(self):
self.request = Request(
method="POST",
url="https://example.com/foo?bar=1",
body='{"hello": "world"}',
headers=CaseInsensitiveDict(
{
"host": "example.com",
"content-type": "application/json",
"content-length": "18",
}
),
)

def test_sign_and_verify(self):
sign_request(self.request, private_key, created=datetime.now())

self.assertEqual(
self.request.headers["Content-Digest"],
"sha-512=:WZDPaVn/7XgHaAy8pmojAkGWoRx2UFChF41A2svX+TaPm+AbwAgBWnrIiYllu7BNNyealdVLvRwEmTHWXvJwew==:",
)
self.assertIn("Signature-Input", self.request.headers)
self.assertIn("Signature", self.request.headers)

verify_request(self.request, public_key, max_age=timedelta(minutes=1))

def test_missing_signature(self):
with self.assertRaises(InvalidSignature):
verify_request(self.request, public_key, max_age=timedelta(minutes=1))

def test_signature_too_old(self):
created = datetime.now() - timedelta(minutes=2)
sign_request(self.request, private_key, created)
with self.assertRaises(InvalidSignature):
verify_request(self.request, public_key, max_age=timedelta(minutes=1))

def test_content_digest_invalid(self):
sign_request(self.request, private_key, datetime.now())
self.request.body = "foo"
with self.assertRaisesRegex(ValueError, "unexpected content digest"):
verify_request(self.request, public_key, max_age=timedelta(minutes=1))

def test_signature_coverage(self):
# Manually sign the request, but do so without including the
# Content-Digest header.
signer = HTTPMessageSigner(
signature_algorithm=SIGNATURE_ALGORITHM,
key_resolver=KeyResolver(private_key=private_key),
)
signer.sign(
self.request,
key_id=DEFAULT_KEY_ID,
covered_component_ids=COVERED_COMPONENT_IDS - {"content-digest"},
created=datetime.now(),
label=LABEL,
include_alg=True,
)

with self.assertRaises(ValueError):
verify_request(self.request, public_key, max_age=timedelta(minutes=1))

def test_known_signature(self):
# See:
# https://datatracker.ietf.org/doc/html/draft-ietf-httpbis-message-signatures#appendix-B.1.4
# https://datatracker.ietf.org/doc/html/draft-ietf-httpbis-message-signatures#name-test-cases
request = Request(
method="POST",
url="https://example.com/foo?param=Value&Pet=dog",
headers=CaseInsensitiveDict(
{
"host": "example.com",
"date": "Tue, 20 Apr 2021 02:07:55 GMT",
"content-type": "application/json",
"content-length": "18",
"content-digest": "sha-512=:WZDPaVn/7XgHaAy8pmojAkGWoRx2UFChF41A2svX+TaPm+AbwAgBWnrIiYllu7BNNyealdVLvRwEmTHWXvJwew==:",
"signature-input": 'sig-b26=("date" "@method" "@path" "@authority" "content-type" "content-length");created=1618884473;keyid="test-key-ed25519"',
"signature": "sig-b26=:wqcAqbmYJ2ji2glfAMaRy4gruYYnx2nEFN2HN6jrnDnQCK1u02Gb04v9EDgwUPiu4A0w6vuQv5lIp5WPpBKRCw==:",
}
),
body='{"hello": "world"}',
)

# It's not accepted because the keyid != "default"
with self.assertRaisesRegex(
ValueError, "public key 'test-key-ed25519' not available"
):
verify_request(request, public_key, max_age=timedelta(weeks=9000))

0 comments on commit 1d8835a

Please sign in to comment.