Skip to content

Commit

Permalink
Merge pull request #1537 from MVrachev/role-name-uniqueness
Browse files Browse the repository at this point in the history
Enforce role name uniqueness and add Targets key helpers
  • Loading branch information
Jussi Kukkonen authored Sep 22, 2021
2 parents afc67d9 + f00295f commit cc1f95e
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 54 deletions.
1 change: 0 additions & 1 deletion tests/slow_retrieval_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""

import os
import sys
import time
import http.server

Expand Down
102 changes: 83 additions & 19 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
Key,
MetaFile,
TargetFile,
Delegations,
DelegatedRole,
)

Expand Down Expand Up @@ -467,6 +466,10 @@ def test_root_add_key_and_remove_key(self):
# Add the same key to targets role as well
root.signed.add_key('targets', key_metadata)

# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)

# Remove the key from root role (targets role still uses it)
root.signed.remove_key('root', keyid)
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
Expand All @@ -477,8 +480,10 @@ def test_root_add_key_and_remove_key(self):
self.assertNotIn(keyid, root.signed.roles['targets'].keyids)
self.assertNotIn(keyid, root.signed.keys)

with self.assertRaises(KeyError):
with self.assertRaises(ValueError):
root.signed.remove_key('root', 'nosuchkey')
with self.assertRaises(ValueError):
root.signed.remove_key('nosuchrole', keyid)

def test_is_target_in_pathpattern(self):
supported_use_cases = [
Expand Down Expand Up @@ -512,23 +517,6 @@ def test_is_target_in_pathpattern(self):
)


def test_delegation_class(self):
# empty keys and roles
delegations_dict = {"keys":{}, "roles":[]}
delegations = Delegations.from_dict(delegations_dict.copy())
self.assertEqual(delegations_dict, delegations.to_dict())

# Test some basic missing or broken input
invalid_delegations_dicts = [
{},
{"keys":None, "roles":None},
{"keys":{"foo":0}, "roles":[]},
{"keys":{}, "roles":["foo"]},
]
for d in invalid_delegations_dicts:
with self.assertRaises((KeyError, AttributeError)):
Delegations.from_dict(d)

def test_metadata_targets(self):
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
Expand All @@ -554,6 +542,82 @@ def test_metadata_targets(self):
targets.signed.targets[filename].to_dict(), fileinfo.to_dict()
)

def test_targets_key_api(self):
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
targets: Targets = Metadata[Targets].from_file(targets_path).signed

# Add a new delegated role "role2" in targets
delegated_role = DelegatedRole.from_dict({
"keyids": [],
"name": "role2",
"paths": ["fn3", "fn4"],
"terminating": False,
"threshold": 1
})
targets.delegations.roles["role2"] = delegated_role

key_dict = {
"keytype": "ed25519",
"keyval": {
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519"
}
key = Key.from_dict("id2", key_dict)

# Assert that delegated role "role1" does not contain the new key
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
targets.add_key("role1", key)

# Assert that the new key is added to the delegated role "role1"
self.assertIn(key.keyid, targets.delegations.roles["role1"].keyids)

# Confirm that the newly added key does not break the obj serialization
targets.to_dict()

# Try adding the same key again and assert its ignored.
past_keyid = targets.delegations.roles["role1"].keyids.copy()
targets.add_key("role1", key)
self.assertEqual(past_keyid, targets.delegations.roles["role1"].keyids)

# Try adding a key to a delegated role that doesn't exists
with self.assertRaises(ValueError):
targets.add_key("nosuchrole", key)

# Add the same key to "role2" as well
targets.add_key("role2", key)

# Remove the key from "role1" role ("role2" still uses it)
targets.remove_key("role1", key.keyid)

# Assert that delegated role "role1" doesn't contain the key.
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
self.assertIn(key.keyid, targets.delegations.roles["role2"].keyids)

# Remove the key from "role2" as well
targets.remove_key("role2", key.keyid)
self.assertNotIn(key.keyid, targets.delegations.roles["role2"].keyids)

# Try remove key not used by "role1"
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)

# Try removing a key from delegated role that doesn't exists
with self.assertRaises(ValueError):
targets.remove_key("nosuchrole", key.keyid)

# Remove delegations as a whole
targets.delegations = None
# Test that calling add_key and remove_key throws an error
# and that delegations is still None after each of the api calls
with self.assertRaises(ValueError):
targets.add_key("role1", key)
self.assertTrue(targets.delegations is None)
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)
self.assertTrue(targets.delegations is None)


def test_length_and_hash_validation(self):

Expand Down
6 changes: 1 addition & 5 deletions tests/test_indefinite_freeze_attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
import unittest
import sys
from urllib import request

if sys.version_info >= (3, 3):
import unittest.mock as mock
else:
import mock
import unittest.mock as mock

import tuf.formats
import tuf.log
Expand Down
25 changes: 22 additions & 3 deletions tests/test_metadata_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
import unittest
import copy

from typing import Dict, Callable, Optional, Mapping, Any
from datetime import datetime
from typing import Dict, Callable

from tests import utils

from tuf.api.metadata import (
Signed,
Root,
Snapshot,
Timestamp,
Expand Down Expand Up @@ -304,6 +302,27 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str):
DelegatedRole.from_dict(copy.copy(case_dict))


invalid_delegations: DataSet = {
"empty delegations": '{}',
"bad keys": '{"keys": "foo", \
"roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}',
"bad roles": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
"roles": ["foo"]}',
"duplicate role names": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
"roles": [ \
{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}, \
{"keyids": ["keyid2"], "name": "a", "paths": ["fn3"], "terminating": false, "threshold": 2} \
] \
}',
}

@run_sub_tests_with_dataset(invalid_delegations)
def test_invalid_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError, AttributeError)):
Delegations.from_dict(copy.deepcopy(case_dict))


valid_delegations: DataSet = {
"all":
'{"keys": { \
Expand Down
2 changes: 1 addition & 1 deletion tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Union, Callable
from typing import Optional, Callable
import os
import sys
import unittest
Expand Down
7 changes: 1 addition & 6 deletions tests/test_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@
import shutil
import tempfile
import sys

if sys.version_info >= (3, 3):
import unittest.mock as mock

else:
import mock
import unittest.mock as mock

from tuf.repository_tool import * # part of TUTORIAL.md

Expand Down
6 changes: 1 addition & 5 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@
import sys
import unittest
import json

if sys.version_info >= (3, 3):
import unittest.mock as mock
else:
import mock
import unittest.mock as mock

import tuf
import tuf.exceptions
Expand Down
68 changes: 55 additions & 13 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,7 @@ def verify_delegate(
raise ValueError(f"No delegation found for {delegated_role}")

keys = self.signed.delegations.keys
roles = self.signed.delegations.roles
# Assume role names are unique in delegations.roles: #1426
# Find first role in roles with matching name (or None if no match)
role = next((r for r in roles if r.name == delegated_role), None)
role = self.signed.delegations.roles.get(delegated_role)
else:
raise TypeError("Call is valid only on delegator metadata")

Expand Down Expand Up @@ -763,16 +760,27 @@ def to_dict(self) -> Dict[str, Any]:

# Update key for a role.
def add_key(self, role: str, key: Key) -> None:
"""Adds new signing key for delegated role 'role'."""
"""Adds new signing key for delegated role 'role'.
Raises:
ValueError: If 'role' doesn't exist.
"""
if role not in self.roles:
raise ValueError(f"Role {role} doesn't exist")
self.roles[role].keyids.add(key.keyid)
self.keys[key.keyid] = key

def remove_key(self, role: str, keyid: str) -> None:
"""Removes key from 'role' and updates the key store.
Raises:
KeyError: If 'role' does not include the key
ValueError: If 'role' doesn't exist or if 'role' doesn't include
the key.
"""
if role not in self.roles:
raise ValueError(f"Role {role} doesn't exist")
if keyid not in self.roles[role].keyids:
raise ValueError(f"Key with id {keyid} is not used by {role}")
self.roles[role].keyids.remove(keyid)
for keyinfo in self.roles.values():
if keyid in keyinfo.keyids:
Expand Down Expand Up @@ -1146,16 +1154,17 @@ class Delegations:
Attributes:
keys: Dictionary of keyids to Keys. Defines the keys used in 'roles'.
roles: List of DelegatedRoles that define which keys are required to
sign the metadata for a specific role. The roles order also
defines the order that role delegations are considered in.
roles: Ordered dictionary of role names to DelegatedRoles instances. It
defines which keys are required to sign the metadata for a specific
role. The roles order also defines the order that role delegations
are considered in.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
keys: Dict[str, Key],
roles: List[DelegatedRole],
roles: "OrderedDict[str, DelegatedRole]",
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.keys = keys
Expand All @@ -1170,17 +1179,19 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
for keyid, key_dict in keys.items():
keys_res[keyid] = Key.from_dict(keyid, key_dict)
roles = delegations_dict.pop("roles")
roles_res = []
roles_res: "OrderedDict[str, DelegatedRole]" = OrderedDict()
for role_dict in roles:
new_role = DelegatedRole.from_dict(role_dict)
roles_res.append(new_role)
if new_role.name in roles_res:
raise ValueError(f"Duplicate role {new_role.name}")
roles_res[new_role.name] = new_role
# All fields left in the delegations_dict are unrecognized.
return cls(keys_res, roles_res, delegations_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
roles = [role_obj.to_dict() for role_obj in self.roles]
roles = [role_obj.to_dict() for role_obj in self.roles.values()]
return {
"keys": keys,
"roles": roles,
Expand Down Expand Up @@ -1385,3 +1396,34 @@ def to_dict(self) -> Dict[str, Any]:
def update(self, fileinfo: TargetFile) -> None:
"""Assigns passed target file info to meta dict."""
self.targets[fileinfo.path] = fileinfo

def add_key(self, role: str, key: Key) -> None:
"""Adds new signing key for delegated role 'role'.
Raises:
ValueError: If there are no delegated roles or if 'role' is not
delegated by this Target.
"""
if self.delegations is None or role not in self.delegations.roles:
raise ValueError(f"Delegated role {role} doesn't exist")
self.delegations.roles[role].keyids.add(key.keyid)
self.delegations.keys[key.keyid] = key

def remove_key(self, role: str, keyid: str) -> None:
"""Removes key from delegated role 'role' and updates the delegations
key store.
Raises:
ValueError: If there are no delegated roles or if 'role' is not
delegated by this Target or if key is not used by 'role'.
"""
if self.delegations is None or role not in self.delegations.roles:
raise ValueError(f"Delegated role {role} doesn't exist")
if keyid not in self.delegations.roles[role].keyids:
raise ValueError(f"Key with id {keyid} is not used by {role}")
self.delegations.roles[role].keyids.remove(keyid)
for keyinfo in self.delegations.roles.values():
if keyid in keyinfo.keyids:
return

del self.delegations.keys[keyid]
2 changes: 1 addition & 1 deletion tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _preorder_depth_first_walk(
child_roles_to_visit = []
# NOTE: This may be a slow operation if there are many
# delegated roles.
for child_role in role_metadata.delegations.roles:
for child_role in role_metadata.delegations.roles.values():
if child_role.is_delegated_path(target_filepath):
logger.debug("Adding child role %s", child_role.name)

Expand Down

0 comments on commit cc1f95e

Please sign in to comment.