Description
I'm new to signatures, and am trying to implement a check of signatures on HTTP messages that arrive at a Flask-based REST server (also using connexion, not sure that matters). The tests for that server use a Flask client test fixture, as shown below. I think I can contribute an example of a tox test case that uses a signed Request, and that allows checking of the content-digest.
One wrinkle is the URL. A typical Flask test doesn't use a full URL like "https://blah.company.com/some/path" but rather just the suffix '/some/path'. Using good old localhost
worked here. I believe the server starts on some high-numbered port; the port number is not in the checked fields (component IDs).
To start at the very beginning, I created a public-private key pair like this:
openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out tests/rsa_private.pem
openssl rsa -in rsa_private.pem -pubout -out tests/rsa_public.pem
I am using a Python test fixture that yields a Flask client, in file tests/conftest.py
:
@pytest.fixture
def client():
flask_app.config["TESTING"] = True
cl = flask_app.test_client()
yield cl
Here's the supporting code for test cases, in a file like tests/test_controller.py
:
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from http_message_signatures import algorithms, HTTPMessageSigner, HTTPSignatureKeyResolver
import hashlib
import http_sfv
import json
import requests
class TestHTTPSignatureKeyResolver(HTTPSignatureKeyResolver):
"""
Provides a method to get a private key from a file,
as required by the signature generator method.
"""
def __init__(self):
"""
Fetch the private key from the test file.
"""
with open("tests/rsa_private.pem", "rb") as fh:
self.private_key = load_pem_private_key(fh.read(), password=None)
logger.debug('__init__: private key %s', self.private_key)
def resolve_public_key(self, key_id: str):
"""
Signature generation does not use a private key.
"""
logger.debug('resolve_public_key: key_id %s', key_id)
raise NotImplementedError('No public key')
def resolve_private_key(self, key_id: str):
logger.debug('resolve_private_key: key_id %s', key_id)
return self.private_key
def _create_signature_headers(
method: str, url_base: str,
component_ids: tuple = None,
data: bytes = None) -> dict:
"""
Prepend 'http://localhost' to the URL, then build and sign a request.
If data is not None, compute and add a content-digest header for signing.
Return a dict with these headers:
Content-Digest (optional)
Signature-Input
Signature
:param method: HTTP method; e.g., 'POST'
:param url_base: endpoint path with leading slash; e.g., /controller/entity
:param component_ids: iterable of HTTP component names to sign;
if None, defaults to @method, @authority, @target-uri
:param data: bytes, an encoded JSON string; defaults to None
"""
logger.debug('_create_signature_headers: method %s url %s components %s',
method, url_base, component_ids)
con_dig = 'Content-Digest'
sig_ipt = 'Signature-Input'
sig = 'Signature'
url = 'http://localhost' + url_base
request = requests.Request(method=method, url=url, data=data)
request = request.prepare()
signature_headers = {}
if data is not None:
request.headers[con_dig] = str(http_sfv.Dictionary({"sha-256":
hashlib.sha256(data).digest()}))
signature_headers[con_dig] = request.headers[con_dig]
signer = HTTPMessageSigner(signature_algorithm=algorithms.RSA_PSS_SHA512,
key_resolver=TestHTTPSignatureKeyResolver())
# sign returns nothing
if component_ids is None:
signer.sign(request, key_id='anykey')
else:
signer.sign(request, key_id='anykey',
covered_component_ids=component_ids)
# extract the signature input and signature
signature_headers[sig_ipt] = request.headers[sig_ipt],
signature_headers[sig] = request.headers[sig]
logger.debug('_create_signature_headers: return %s', signature_headers)
return signature_headers
Finally here's a Flask test case POST-ing to a controller method with the signature headers. It creates a request body as bytes and specifies the content-type. Usually I delegate both those actions to the client's post
method by using the json parameter. Doing it this way ensures that the client sends exactly the body that was signed; for example, no re-ordering of the JSON keys.
def test_ctlr_msg_sig(client):
simdata = {'some': 'thing'}
request_json = json.dumps(simdata, sort_keys=True)
request_bytes = request_json.encode('utf-8')
url = '/some/path'
hdrs = _create_signature_headers(
method='POST', url_base=url,
data=request_bytes,
component_ids=("@method", "@authority", "@target-uri",
"content-digest", "content-length"))
hdrs['content-type'] = 'application/json'
res = client.post(url, headers=hdrs, data=request_bytes)
assert res.status_code == 201
If there's an easier way please say!