diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 7b9b8243c8..4c449f4c97 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -36,6 +36,7 @@ import json import shutil import unittest +import copy import tuf import tuf.formats @@ -310,11 +311,6 @@ def test_generate_root_metadata(self): # securesystemslib.exceptions.Error exception is raised for duplicate keyids. tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa' - # Add duplicate keyid to root's roleinfo. - tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0]) - self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1, - expires, consistent_snapshot=False) - # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata, '3', expires, False) @@ -349,48 +345,55 @@ def test_generate_targets_metadata(self): file_permissions = oct(os.stat(file1_path).st_mode)[4:] target_files = {'file.txt': {'custom': {'file_permission': file_permissions}}} - delegations = {"keys": { - "a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": { - "keytype": "ed25519", - "keyval": { - "public": "3eb81026ded5af2c61fb3d4b272ac53cd1049a810ee88f4df1fc35cdaf918157" - } - } - }, - "roles": [ - { - "keyids": [ - "a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf" - ], - "name": "targets/warehouse", - "paths": [ - "/file1.txt", "/README.txt", '/warehouse/' - ], - "threshold": 1 - } - ] - } + # Delegations data must be loaded into roledb since + # generate_targets_metadata tries to update delegations keyids + # and threshold + repository_path = os.path.join('repository_data', 'repository') + repository = repo_tool.load_repository(repository_path) + roleinfo = tuf.roledb.get_roleinfo('targets') + delegations = roleinfo['delegations'] - targets_metadata = \ - repo_lib.generate_targets_metadata(targets_directory, target_files, - version, expiration_date, delegations, - False) + targets_metadata = repo_lib.generate_targets_metadata(targets_directory, + target_files, version, expiration_date, delegations, False) self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata)) # Valid arguments with 'delegations' set to None. - targets_metadata = \ - repo_lib.generate_targets_metadata(targets_directory, target_files, - version, expiration_date, None, - False) + targets_metadata = repo_lib.generate_targets_metadata(targets_directory, + target_files, version, expiration_date, None, False) self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata)) + # Test update in targets' delegations + keystore_path = os.path.join('repository_data', 'keystore') + targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub') + targets_public_key = securesystemslib.interface.\ + import_ed25519_publickey_from_file(targets_public_keypath) + + # Add new key and threshold to delegated role + repository.targets('role1').add_verification_key(targets_public_key) + repository.targets('role1').threshold = 2 + role1_keyids = tuf.roledb.get_role_keyids('role1') + role1_threshold = tuf.roledb.get_role_threshold('role1') + roleinfo = tuf.roledb.get_roleinfo('targets') + delegations = roleinfo['delegations'] + old_delegations = copy.deepcopy(delegations) + + targets_metadata = repo_lib.generate_targets_metadata(targets_directory, + target_files, version, expiration_date, delegations, False) + self.assertNotEqual(old_delegations, delegations) + self.assertEqual(role1_keyids, + targets_metadata['delegations']['roles'][0]['keyids']) + self.assertEqual(role1_threshold, + targets_metadata['delegations']['roles'][0]['threshold']) + for keyid in role1_keyids: + self.assertIn(keyid, targets_metadata['delegations']['keys']) + + # Verify that 'digest.filename' file is saved to 'targets_directory' if # the 'write_consistent_targets' argument is True. list_targets_directory = os.listdir(targets_directory) - targets_metadata = \ - repo_lib.generate_targets_metadata(targets_directory, target_files, - version, expiration_date, delegations, - write_consistent_targets=True) + targets_metadata = repo_lib.generate_targets_metadata(targets_directory, + target_files, version, expiration_date, delegations, + write_consistent_targets=True) new_list_targets_directory = os.listdir(targets_directory) # Verify that 'targets_directory' contains only one extra item. @@ -958,6 +961,21 @@ def test__load_top_level_metadata(self): repository = repo_tool.create_new_repository(repository_directory, repository_name) repo_lib._load_top_level_metadata(repository, filenames, repository_name) + # Manually add targets delegations to roledb since + # repository.write('targets') will try to update its delegations + targets_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'targets.json') + targets_signable = securesystemslib.util.load_json_file(targets_filepath) + delegations = targets_signable['signed']['delegations'] + + roleinfo = {} + roleinfo['name'] = delegations['roles'][0]['name'] + roleinfo['keyids'] = delegations['roles'][0]['keyids'] + roleinfo['threshold'] = delegations['roles'][0]['threshold'] + roleinfo['version'] = 1 + tuf.roledb.add_role('role1', roleinfo, repository_name) + + # Partially write all top-level roles (we increase the threshold of each # top-level role so that they are flagged as partially written. repository.root.threshold = repository.root.threshold + 1 diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 4dc3d6c9a3..1ee45c21a8 100644 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -166,7 +166,13 @@ def _generate_and_write_metadata(rolename, metadata_filename, metadata = generate_targets_metadata(targets_directory, roleinfo['paths'], roleinfo['version'], roleinfo['expires'], roleinfo['delegations'], consistent_targets, use_existing_fileinfo, - storage_backend) + storage_backend, repository_name) + + # Update roledb with the latest delegations info collected during + # generate_targets_metadata() + tuf.roledb.update_roleinfo(rolename, roleinfo, + repository_name=repository_name) + # Before writing 'rolename' to disk, automatically increment its version # number (if 'increment_version_number' is True) so that the caller does not @@ -1227,6 +1233,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # Conformant to 'ROLEDICT_SCHEMA' and 'KEYDICT_SCHEMA', respectively. roledict = {} keydict = {} + keylist = [] # Extract the role, threshold, and keyid information of the top-level roles, # which Root stores in its metadata. The necessary role metadata is generated @@ -1238,43 +1245,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, raise securesystemslib.exceptions.Error(repr(rolename) + ' not in' ' "tuf.roledb".') - # Keep track of the keys loaded to avoid duplicates. - keyids = [] - - # Generate keys for the keyids listed by the role being processed. - for keyid in tuf.roledb.get_role_keyids(rolename, repository_name): + # Collect keys from all roles in a list + keyids = tuf.roledb.get_role_keyids(rolename, repository_name) + for keyid in keyids: key = tuf.keydb.get_key(keyid, repository_name=repository_name) - - # If 'key' is an RSA key, it would conform to - # 'securesystemslib.formats.RSAKEY_SCHEMA', and have the form: - # {'keytype': 'rsa', - # 'keyid': keyid, - # 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...', - # 'private': '-----BEGIN RSA PRIVATE KEY----- ...'}} - keyid = key['keyid'] - if keyid not in keydict: - - # This appears to be a new keyid. Generate the key for it. - if key['keytype'] in ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']: - keytype = key['keytype'] - keyval = key['keyval'] - scheme = key['scheme'] - keydict[keyid] = \ - securesystemslib.keys.format_keyval_to_metadata(keytype, - scheme, keyval, private=False) - - # This is not a recognized key. Raise an exception. - else: - raise securesystemslib.exceptions.Error('Unsupported keytype:' - ' ' + key['keytype']) - - # Do we have a duplicate? - if keyid in keyids: - raise securesystemslib.exceptions.Error('Same keyid listed twice:' - ' ' + keyid) - - # Add the loaded keyid for the role being processed. - keyids.append(keyid) + keylist.append(key) # Generate the authentication information Root establishes for each # top-level role. @@ -1285,6 +1260,9 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, threshold=role_threshold) roledict[rolename] = role_metadata + # Create the root metadata 'keys' dictionary + _, keydict = keys_to_keydict(keylist) + # Use generalized build_dict_conforming_to_schema func to produce a dict that # contains all the appropriate information for this type of metadata, # checking that the result conforms to the appropriate schema. @@ -1307,7 +1285,8 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, def generate_targets_metadata(targets_directory, target_files, version, expiration_date, delegations=None, write_consistent_targets=False, - use_existing_fileinfo=False, storage_backend=None): + use_existing_fileinfo=False, storage_backend=None, + repository_name='default'): """ Generate the targets metadata object. The targets in 'target_files' must @@ -1360,6 +1339,10 @@ def generate_targets_metadata(targets_directory, target_files, version, An object which implements securesystemslib.storage.StorageBackendInterface. + repository_name: + The name of the repository. If not supplied, 'default' repository + is used. + securesystemslib.exceptions.FormatError, if an error occurred trying to generate the targets metadata object. @@ -1404,6 +1387,23 @@ def generate_targets_metadata(targets_directory, target_files, version, if delegations is not None: tuf.formats.DELEGATIONS_SCHEMA.check_match(delegations) + # If targets role has delegations, collect the up-to-date 'keyids' and + # 'threshold' for each role. Update the delegations keys dictionary. + delegations_keys = [] + # Update 'keyids' and 'threshold' for each delegated role + for role in delegations['roles']: + role['keyids'] = tuf.roledb.get_role_keyids(role['name'], + repository_name) + role['threshold'] = tuf.roledb.get_role_threshold(role['name'], + repository_name) + + # Collect all delegations keys for generating the delegations keydict + for keyid in role['keyids']: + key = tuf.keydb.get_key(keyid, repository_name=repository_name) + delegations_keys.append(key) + + _, delegations['keys'] = keys_to_keydict(delegations_keys) + # Store the file attributes of targets in 'target_files'. 'filedict', # conformant to 'tuf.formats.FILEDICT_SCHEMA', is added to the @@ -2253,9 +2253,42 @@ def disable_console_log_messages(): +def keys_to_keydict(keys): + """ + + Iterate over a list of keys and return a list of keyids and a dict mapping + keyid to key metadata + + + keys: + A list of key objects conforming to + securesystemslib.formats.ANYKEYLIST_SCHEMA. + + + keyids: + A list of keyids conforming to securesystemslib.formats.KEYID_SCHEMA + keydict: + A dictionary conforming to securesystemslib.formats.KEYDICT_SCHEMA + """ + keyids = [] + keydict = {} + + for key in keys: + keyid = key['keyid'] + key_metadata_format = securesystemslib.keys.format_keyval_to_metadata( + key['keytype'], key['scheme'], key['keyval']) + + new_keydict = {keyid: key_metadata_format} + keydict.update(new_keydict) + keyids.append(keyid) + return keyids, keydict + + + + if __name__ == '__main__': # The interactive sessions of the documentation strings can - # be tested by running repository_tool.py as a standalone module: + # be tested by running repository_lib.py as a standalone module: # $ python repository_lib.py. import doctest doctest.testmod() diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 3a2f4dce28..ef0e38c1d4 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -2377,7 +2377,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1, # Keep track of the valid keyids (added to the new Targets object) and # their keydicts (added to this Targets delegations). - keyids, keydict = _keys_to_keydict(public_keys) + keyids, keydict = repo_lib.keys_to_keydict(public_keys) # Ensure the paths of 'list_of_targets' are located in the repository's # targets directory. @@ -2612,7 +2612,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, hash_prefix = repo_lib.get_target_hash(target_path)[:prefix_length] ordered_roles[int(hash_prefix, 16) // bin_size]["target_paths"].append(target_path) - keyids, keydict = _keys_to_keydict(keys_of_hashed_bins) + keyids, keydict = repo_lib.keys_to_keydict(keys_of_hashed_bins) # A queue of roleinfo's that need to be updated in the roledb delegated_roleinfos = [] @@ -2857,30 +2857,6 @@ def _check_path(self, pathname): - -def _keys_to_keydict(keys): - """ - Iterate over a list of keys and return a list of keyids and a dict mapping - keyid to key metadata - """ - keyids = [] - keydict = {} - - for key in keys: - keyid = key['keyid'] - key_metadata_format = securesystemslib.keys.format_keyval_to_metadata( - key['keytype'], key['scheme'], key['keyval']) - - new_keydict = {keyid: key_metadata_format} - keydict.update(new_keydict) - keyids.append(keyid) - - return keyids, keydict - - - - - def create_new_repository(repository_directory, repository_name='default', storage_backend=None, use_timestamp_length=True, use_timestamp_hashes=True, use_snapshot_length=False, use_snapshot_hashes=False): @@ -3111,12 +3087,14 @@ def load_repository(repository_directory, repository_name='default', # [('role1', 'targets'), ('role2', 'targets'), ... ] roleinfo = tuf.roledb.get_roleinfo('targets', repository_name) for role in roleinfo['delegations']['roles']: - delegations.append((role['name'], 'targets')) + delegations.append((role, 'targets')) # Traverse the graph by appending the next delegation to the deque and # 'pop'-ing and loading the left-most element. while delegations: - rolename, delegating_role = delegations.popleft() + delegation_info, delegating_role = delegations.popleft() + + rolename = delegation_info['name'] if (rolename, delegating_role) in loaded_delegations: logger.warning('Detected cycle in the delegation graph: ' + repr(delegating_role) + ' -> ' + @@ -3156,6 +3134,8 @@ def load_repository(repository_directory, repository_name='default', roleinfo['expires'] = metadata_object['expires'] roleinfo['paths'] = metadata_object['targets'] roleinfo['delegations'] = metadata_object['delegations'] + roleinfo['threshold'] = delegation_info['threshold'] + roleinfo['keyids'] = delegation_info['keyids'] # Generate the Targets object of the delegated role, # add it to the top-level 'targets' object and to its @@ -3173,7 +3153,7 @@ def load_repository(repository_directory, repository_name='default', # Append the next level delegations to the deque: # the 'delegated' role becomes the 'delegating' for delegation in metadata_object['delegations']['roles']: - delegations.append((delegation['name'], rolename)) + delegations.append((delegation, rolename)) # Extract the keys specified in the delegations field of the Targets # role. Add 'key_object' to the list of recognized keys. Keys may be