Skip to content

Commit

Permalink
Merge pull request #1370 from MVrachev/delegation-classes
Browse files Browse the repository at this point in the history
New API: Add DelegationRole and Delegations classes
  • Loading branch information
Jussi Kukkonen authored May 12, 2021
2 parents a261d49 + b2cde9b commit 2ef8546
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 36 deletions.
86 changes: 83 additions & 3 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
Timestamp,
Targets,
Key,
Role
Role,
Delegations,
DelegatedRole,
)

from tuf.api.serialization import (
Expand Down Expand Up @@ -328,7 +330,7 @@ def test_key_class(self):
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519"
},
},
}
for key_dict in keys.values():
# Testing that the workflow of deserializing and serializing
Expand Down Expand Up @@ -422,6 +424,76 @@ def test_metadata_root(self):
with self.assertRaises(KeyError):
root.signed.remove_key('root', 'nosuchkey')

def test_delegated_role_class(self):
roles = [
{
"keyids": [
"c8022fa1e9b9cb239a6b362bbdffa9649e61ad2cb699d2e4bc4fdf7930a0e64a"
],
"name": "role1",
"paths": [
"file3.txt"
],
"terminating": False,
"threshold": 1
}
]
for role in roles:
# Testing that the workflow of deserializing and serializing
# a delegation role dictionary doesn't change the content.
key_obj = DelegatedRole.from_dict(role.copy())
self.assertEqual(role, key_obj.to_dict())

# Test creating a DelegatedRole object with both "paths" and
# "path_hash_prefixes" set.
role["path_hash_prefixes"] = "foo"
with self.assertRaises(ValueError):
DelegatedRole.from_dict(role.copy())

# Test creating DelegatedRole only with "path_hash_prefixes"
del role["paths"]
DelegatedRole.from_dict(role.copy())
role["paths"] = "foo"

# Test creating DelegatedRole only with "paths"
del role["path_hash_prefixes"]
DelegatedRole.from_dict(role.copy())
role["path_hash_prefixes"] = "foo"

# Test creating DelegatedRole without "paths" and
# "path_hash_prefixes" set
del role["paths"]
del role["path_hash_prefixes"]
DelegatedRole.from_dict(role)


def test_delegation_class(self):
roles = [
{
"keyids": [
"c8022fa1e9b9cb239a6b362bbdffa9649e61ad2cb699d2e4bc4fdf7930a0e64a"
],
"name": "role1",
"paths": [
"file3.txt"
],
"terminating": False,
"threshold": 1
}
]
keys = {
"59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d":{
"keytype": "ed25519",
"keyval": {
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519"
},
}
delegations_dict = {"keys": keys, "roles": roles}
delegations = Delegations.from_dict(copy.deepcopy(delegations_dict))
self.assertEqual(delegations_dict, delegations.to_dict())


def test_metadata_targets(self):
targets_path = os.path.join(
Expand Down Expand Up @@ -452,7 +524,6 @@ def test_metadata_targets(self):
del targets_dict["signed"]["delegations"]
tmp_dict = targets_dict["signed"].copy()
targets_obj = Targets.from_dict(tmp_dict)
tar_d = targets_obj.to_dict()
self.assertEqual(targets_dict["signed"], targets_obj.to_dict())

def setup_dict_with_unrecognized_field(self, file_path, field, value):
Expand All @@ -477,6 +548,15 @@ def test_support_for_unrecognized_fields(self):
dict1["signed"]["keys"][keyid]["d"] = "c"
for role_str in dict1["signed"]["roles"].keys():
dict1["signed"]["roles"][role_str]["e"] = "g"
elif metadata == "targets" and dict1["signed"].get("delegations"):
for keyid in dict1["signed"]["delegations"]["keys"].keys():
dict1["signed"]["delegations"]["keys"][keyid]["d"] = "c"
new_roles = []
for role in dict1["signed"]["delegations"]["roles"]:
role["e"] = "g"
new_roles.append(role)
dict1["signed"]["delegations"]["roles"] = new_roles
dict1["signed"]["delegations"]["foo"] = "bar"

temp_copy = copy.deepcopy(dict1)
metadata_obj = Metadata.from_dict(temp_copy)
Expand Down
162 changes: 129 additions & 33 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,129 @@ def update(
self.meta[metadata_fn]["hashes"] = hashes


class DelegatedRole(Role):
"""A container with information about particular delegated role.
Attributes:
name: A string giving the name of the delegated role.
keyids: A set of strings each of which represents a given key.
threshold: An integer representing the required number of keys for that
particular role.
terminating: A boolean indicating whether subsequent delegations
should be considered.
paths: An optional list of strings, where each string describes
a path that the role is trusted to provide.
path_hash_prefixes: An optional list of HEX_DIGESTs used to succinctly
describe a set of target paths. Only one of the attributes "paths"
and "path_hash_prefixes" is allowed to be set.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
name: str,
keyids: List[str],
threshold: int,
terminating: bool,
paths: Optional[List[str]] = None,
path_hash_prefixes: Optional[List[str]] = None,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(keyids, threshold, unrecognized_fields)
self.name = name
self.terminating = terminating
if paths and path_hash_prefixes:
raise ValueError(
"Only one of the attributes 'paths' and"
"'path_hash_prefixes' can be set!"
)
self.paths = paths
self.path_hash_prefixes = path_hash_prefixes

@classmethod
def from_dict(cls, role_dict: Mapping[str, Any]) -> "Role":
"""Creates DelegatedRole object from its dict representation."""
name = role_dict.pop("name")
keyids = role_dict.pop("keyids")
threshold = role_dict.pop("threshold")
terminating = role_dict.pop("terminating")
paths = role_dict.pop("paths", None)
path_hash_prefixes = role_dict.pop("path_hash_prefixes", None)
# All fields left in the role_dict are unrecognized.
return cls(
name,
keyids,
threshold,
terminating,
paths,
path_hash_prefixes,
role_dict,
)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
base_role_dict = super().to_dict()
res_dict = {
"name": self.name,
"terminating": self.terminating,
**base_role_dict,
}
if self.paths:
res_dict["paths"] = self.paths
elif self.path_hash_prefixes:
res_dict["path_hash_prefixes"] = self.path_hash_prefixes
return res_dict


class Delegations:
"""A container object storing information about all delegations.
Attributes:
keys: A dictionary of keyids and key objects containing information
about the corresponding key.
roles: A list of DelegatedRole instances containing information about
all delegated roles.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
keys: Mapping[str, Key],
roles: List[DelegatedRole],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.keys = keys
self.roles = roles
self.unrecognized_fields = unrecognized_fields or {}

@classmethod
def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
"""Creates Delegations object from its dict representation."""
keys = delegations_dict.pop("keys")
keys_res = {}
for keyid, key_dict in keys.items():
keys_res[keyid] = Key.from_dict(key_dict)
roles = delegations_dict.pop("roles")
roles_res = []
for role_dict in roles:
new_role = DelegatedRole.from_dict(role_dict)
roles_res.append(new_role)
# All fields left in the delegations_dict are unrecognized.
return cls(keys_res, roles_res, delegations_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
roles = [role_obj.to_dict() for role_obj in self.roles]
return {
"keys": keys,
"roles": roles,
**self.unrecognized_fields,
}


class Targets(Signed):
"""A container for the signed part of targets metadata.
Expand All @@ -757,38 +880,9 @@ class Targets(Signed):
...
}
delegations: A dictionary that contains a list of delegated target
delegations: An optional object containing a list of delegated target
roles and public key store used to verify their metadata
signatures::
{
'keys' : {
'<KEYID>': {
'keytype': '<KEY TYPE>',
'scheme': '<KEY SCHEME>',
'keyid_hash_algorithms': [
'<HASH ALGO 1>',
'<HASH ALGO 2>'
...
],
'keyval': {
'public': '<PUBLIC KEY HEX REPRESENTATION>'
}
},
...
},
'roles': [
{
'name': '<ROLENAME>',
'keyids': ['<SIGNING KEY KEYID>', ...],
'threshold': <SIGNATURE THRESHOLD>,
'terminating': <TERMINATING BOOLEAN>,
'path_hash_prefixes': ['<HEX DIGEST>', ... ], // or
'paths' : ['PATHPATTERN', ... ],
},
...
]
}
signatures.
"""

Expand All @@ -804,7 +898,7 @@ def __init__(
spec_version: str,
expires: datetime,
targets: Dict[str, Any],
delegations: Optional[Dict[str, Any]] = None,
delegations: Optional[Delegations] = None,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(version, spec_version, expires, unrecognized_fields)
Expand All @@ -818,6 +912,8 @@ def from_dict(cls, targets_dict: Dict[str, Any]) -> "Targets":
common_args = cls._common_fields_from_dict(targets_dict)
targets = targets_dict.pop("targets")
delegations = targets_dict.pop("delegations", None)
if delegations:
delegations = Delegations.from_dict(delegations)
# All fields left in the targets_dict are unrecognized.
return cls(*common_args, targets, delegations, targets_dict)

Expand All @@ -826,7 +922,7 @@ def to_dict(self) -> Dict[str, Any]:
targets_dict = self._common_fields_to_dict()
targets_dict["targets"] = self.targets
if self.delegations:
targets_dict["delegations"] = self.delegations
targets_dict["delegations"] = self.delegations.to_dict()
return targets_dict

# Modification.
Expand Down

0 comments on commit 2ef8546

Please sign in to comment.