Skip to content

Commit

Permalink
Merge pull request #1423 from jku/verify-in-key
Browse files Browse the repository at this point in the history
Metadata API: implement sig verification in Key, store id in key
  • Loading branch information
Jussi Kukkonen authored Jun 9, 2021
2 parents 7f3b15e + 414dfc8 commit de78251
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 101 deletions.
71 changes: 32 additions & 39 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,10 @@ def setUpClass(cls):
# Load keys into memory
cls.keystore = {}
for role in ['delegation', 'snapshot', 'targets', 'timestamp']:
cls.keystore[role] = {
'private': import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + '_key'),
password="password"),
'public': import_ed25519_publickey_from_file(
os.path.join(cls.keystore_dir, role + '_key.pub'))
}
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + '_key'),
password="password"
)


@classmethod
Expand Down Expand Up @@ -162,50 +159,46 @@ def test_read_write_read_compare(self):


def test_sign_verify(self):
root_path = os.path.join(self.repo_dir, 'metadata', 'root.json')
root:Root = Metadata.from_file(root_path).signed

# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (targets) and assert ...
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
metadata_obj = Metadata.from_file(path)

# ... it has a single existing signature,
self.assertTrue(len(metadata_obj.signatures) == 1)
# ... which is valid for the correct key.
self.assertTrue(metadata_obj.verify(
self.keystore['targets']['public']))
targets_key.verify_signature(metadata_obj)
with self.assertRaises(tuf.exceptions.UnsignedMetadataError):
snapshot_key.verify_signature(metadata_obj)

sslib_signer = SSlibSigner(self.keystore['snapshot']['private'])
sslib_signer = SSlibSigner(self.keystore['snapshot'])
# Append a new signature with the unrelated key and assert that ...
metadata_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
self.assertTrue(len(metadata_obj.signatures) == 2)
# ... both are valid for the corresponding keys.
self.assertTrue(metadata_obj.verify(
self.keystore['targets']['public']))
self.assertTrue(metadata_obj.verify(
self.keystore['snapshot']['public']))
targets_key.verify_signature(metadata_obj)
snapshot_key.verify_signature(metadata_obj)

sslib_signer.key_dict = self.keystore['timestamp']['private']
sslib_signer = SSlibSigner(self.keystore['timestamp'])
# Create and assign (don't append) a new signature and assert that ...
metadata_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
self.assertTrue(len(metadata_obj.signatures) == 1)
# ... valid for that key.
self.assertTrue(metadata_obj.verify(
self.keystore['timestamp']['public']))

# Assert exception if there are more than one signatures for a key
metadata_obj.sign(sslib_signer, append=True)
with self.assertRaises(tuf.exceptions.Error) as ctx:
metadata_obj.verify(self.keystore['timestamp']['public'])
self.assertTrue(
'2 signatures for key' in str(ctx.exception),
str(ctx.exception))

# Assert exception if there is no signature for a key
with self.assertRaises(tuf.exceptions.Error) as ctx:
metadata_obj.verify(self.keystore['targets']['public'])
self.assertTrue(
'no signature for' in str(ctx.exception),
str(ctx.exception))
timestamp_key.verify_signature(metadata_obj)
with self.assertRaises(tuf.exceptions.UnsignedMetadataError):
targets_key.verify_signature(metadata_obj)


def test_metadata_base(self):
Expand Down Expand Up @@ -373,18 +366,18 @@ def test_key_class(self):
# Testing that the workflow of deserializing and serializing
# a key dictionary doesn't change the content.
test_key_dict = key_dict.copy()
key_obj = Key.from_dict(test_key_dict)
key_obj = Key.from_dict("id", test_key_dict)
self.assertEqual(key_dict, key_obj.to_dict())
# Test creating an instance without a required attribute.
for key in key_dict.keys():
test_key_dict = key_dict.copy()
del test_key_dict[key]
with self.assertRaises(KeyError):
Key.from_dict(test_key_dict)
Key.from_dict("id", test_key_dict)
# Test creating a Key instance with wrong keyval format.
key_dict["keyval"] = {}
with self.assertRaises(ValueError):
Key.from_dict(key_dict)
Key.from_dict("id", key_dict)


def test_role_class(self):
Expand Down Expand Up @@ -413,7 +406,7 @@ def test_role_class(self):
test_role_dict = role_dict.copy()
del test_role_dict[role_attr]
with self.assertRaises(KeyError):
Key.from_dict(test_role_dict)
Key.from_dict("id", test_role_dict)
# Test creating a Role instance with keyid dublicates.
# for keyid in role_dict["keyids"]:
role_dict["keyids"].append(role_dict["keyids"][0])
Expand All @@ -433,15 +426,15 @@ def test_metadata_root(self):


keyid = root_key2['keyid']
key_metadata = Key(root_key2['keytype'], root_key2['scheme'],
key_metadata = Key(keyid, root_key2['keytype'], root_key2['scheme'],
root_key2['keyval'])

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key('root', keyid, key_metadata)
root.signed.add_key('root', key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles['root'].keyids)
Expand All @@ -453,7 +446,7 @@ def test_metadata_root(self):

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles['root'].keyids.copy()
root.signed.add_key('root', keyid, key_metadata)
root.signed.add_key('root', key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles['root'].keyids)

# Remove the key
Expand Down
129 changes: 67 additions & 62 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime, timedelta
from typing import Any, ClassVar, Dict, List, Mapping, Optional, Tuple, Type

from securesystemslib.keys import verify_signature
from securesystemslib import keys as sslib_keys
from securesystemslib.signer import Signature, Signer
from securesystemslib.storage import FilesystemBackend, StorageBackendInterface
from securesystemslib.util import persist_temp_file
Expand Down Expand Up @@ -251,59 +251,6 @@ def sign(

return signature

def verify(
self,
key: Mapping[str, Any],
signed_serializer: Optional[SignedSerializer] = None,
) -> bool:
"""Verifies 'signatures' over 'signed' that match the passed key by id.
Arguments:
key: A securesystemslib-style public key object.
signed_serializer: A SignedSerializer subclass instance that
implements the desired canonicalization format. Per default a
CanonicalJSONSerializer is used.
Raises:
# TODO: Revise exception taxonomy
tuf.exceptions.Error: None or multiple signatures found for key.
securesystemslib.exceptions.FormatError: Key argument is malformed.
tuf.api.serialization.SerializationError:
'signed' cannot be serialized.
securesystemslib.exceptions.CryptoError, \
securesystemslib.exceptions.UnsupportedAlgorithmError:
Signing errors.
Returns:
A boolean indicating if the signature is valid for the passed key.
"""
signatures_for_keyid = list(
filter(lambda sig: sig.keyid == key["keyid"], self.signatures)
)

if not signatures_for_keyid:
raise exceptions.Error(f"no signature for key {key['keyid']}.")

if len(signatures_for_keyid) > 1:
raise exceptions.Error(
f"{len(signatures_for_keyid)} signatures for key "
f"{key['keyid']}, not sure which one to verify."
)

if signed_serializer is None:
# Use local scope import to avoid circular import errors
# pylint: disable=import-outside-toplevel
from tuf.api.serialization.json import CanonicalJSONSerializer

signed_serializer = CanonicalJSONSerializer()

return verify_signature(
key,
signatures_for_keyid[0].to_dict(),
signed_serializer.serialize(self.signed),
)


class Signed(metaclass=abc.ABCMeta):
"""A base class for the signed part of TUF metadata.
Expand Down Expand Up @@ -431,6 +378,9 @@ class Key:
"""A container class representing the public portion of a Key.
Attributes:
keyid: An identifier string that must uniquely identify a key within
the metadata it is used in. This implementation does not verify
that keyid is the hash of a specific representation of the key.
keytype: A string denoting a public key signature system,
such as "rsa", "ed25519", and "ecdsa-sha2-nistp256".
scheme: A string denoting a corresponding signature scheme. For example:
Expand All @@ -442,26 +392,28 @@ class Key:

def __init__(
self,
keyid: str,
keytype: str,
scheme: str,
keyval: Dict[str, str],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
if not keyval.get("public"):
raise ValueError("keyval doesn't follow the specification format!")
self.keyid = keyid
self.keytype = keytype
self.scheme = scheme
self.keyval = keyval
self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {}

@classmethod
def from_dict(cls, key_dict: Dict[str, Any]) -> "Key":
def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key":
"""Creates Key object from its dict representation."""
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
keyval = key_dict.pop("keyval")
# All fields left in the key_dict are unrecognized.
return cls(keytype, scheme, keyval, key_dict)
return cls(keyid, keytype, scheme, keyval, key_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dictionary representation of self."""
Expand All @@ -472,6 +424,59 @@ def to_dict(self) -> Dict[str, Any]:
**self.unrecognized_fields,
}

def to_securesystemslib_key(self) -> Dict[str, Any]:
"""Returns a Securesystemslib compatible representation of self."""
return {
"keyid": self.keyid,
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
}

def verify_signature(
self,
metadata: Metadata,
signed_serializer: Optional[SignedSerializer] = None,
):
"""Verifies that the 'metadata.signatures' contains a signature made
with this key, correctly signing 'metadata.signed'.
Arguments:
metadata: Metadata to verify
signed_serializer: Optional; SignedSerializer to serialize
'metadata.signed' with. Default is CanonicalJSONSerializer.
Raises:
UnsignedMetadataError: The signature could not be verified for a
variety of possible reasons: see error message.
TODO: Various other errors currently bleed through from lower
level components: Issue #1351
"""
try:
sigs = metadata.signatures
signature = next(sig for sig in sigs if sig.keyid == self.keyid)
except StopIteration:
raise exceptions.UnsignedMetadataError(
f"no signature for key {self.keyid} found in metadata",
metadata.signed,
) from None

if signed_serializer is None:
# pylint: disable=import-outside-toplevel
from tuf.api.serialization.json import CanonicalJSONSerializer

signed_serializer = CanonicalJSONSerializer()

if not sslib_keys.verify_signature(
self.to_securesystemslib_key(),
signature.to_dict(),
signed_serializer.serialize(metadata.signed),
):
raise exceptions.UnsignedMetadataError(
f"Failed to verify {self.keyid} signature for metadata",
metadata.signed,
)


class Role:
"""A container class containing the set of keyids and threshold associated
Expand Down Expand Up @@ -572,7 +577,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root":
roles = signed_dict.pop("roles")

for keyid, key_dict in keys.items():
keys[keyid] = Key.from_dict(key_dict)
keys[keyid] = Key.from_dict(keyid, key_dict)
for role_name, role_dict in roles.items():
roles[role_name] = Role.from_dict(role_dict)

Expand All @@ -598,10 +603,10 @@ def to_dict(self) -> Dict[str, Any]:
return root_dict

# Update key for a role.
def add_key(self, role: str, keyid: str, key_metadata: Key) -> None:
"""Adds new key for 'role' and updates the key store."""
self.roles[role].keyids.add(keyid)
self.keys[keyid] = key_metadata
def add_key(self, role: str, key: Key) -> None:
"""Adds new signing key for delegated role 'role'."""
self.roles[role].keyids.add(key.keyid)
self.keys[key.keyid] = key

def remove_key(self, role: str, keyid: str) -> None:
"""Removes key from 'role' and updates the key store.
Expand Down Expand Up @@ -880,7 +885,7 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
keys = delegations_dict.pop("keys")
keys_res = {}
for keyid, key_dict in keys.items():
keys_res[keyid] = Key.from_dict(key_dict)
keys_res[keyid] = Key.from_dict(keyid, key_dict)
roles = delegations_dict.pop("roles")
roles_res = []
for role_dict in roles:
Expand Down

0 comments on commit de78251

Please sign in to comment.