Skip to content

Commit

Permalink
Merge pull request #1007 from joshuagl/joshuagl/hashed-bins
Browse files Browse the repository at this point in the history
Enhancements for hashed bin delegation
  • Loading branch information
lukpueh authored Apr 1, 2020
2 parents c63b7f3 + 62e4364 commit e095112
Show file tree
Hide file tree
Showing 6 changed files with 503 additions and 362 deletions.
2 changes: 1 addition & 1 deletion tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def test_generate_targets_metadata(self):
datetime_object = datetime.datetime(2030, 1, 1, 12, 0)
expiration_date = datetime_object.isoformat() + 'Z'
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
target_files = {'file.txt': {'file_permission': file_permissions}}
target_files = {'file.txt': {'custom': {'file_permission': file_permissions}}}

delegations = {"keys": {
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
Expand Down
240 changes: 162 additions & 78 deletions tests/test_repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,108 @@ def test_writeall(self):
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3)


def test_writeall_no_files(self):
# Test writeall() when using pre-supplied fileinfo

repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
targets_directory = os.path.join(repository_directory,
repo_tool.TARGETS_DIRECTORY_NAME)

repository = repo_tool.create_new_repository(repository_directory, repository_name)

# (1) Load the public and private keys of the top-level roles, and one
# delegated role.
keystore_directory = os.path.join('repository_data', 'keystore')

# Load the public keys.
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')

root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
targets_pubkey = \
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
snapshot_pubkey = \
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
timestamp_pubkey = \
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)

# Load the private keys.
root_privkey_path = os.path.join(keystore_directory, 'root_key')
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')

root_privkey = \
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
targets_privkey = \
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
'password')
snapshot_privkey = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
'password')
timestamp_privkey = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
'password')


# (2) Add top-level verification keys.
repository.root.add_verification_key(root_pubkey)
repository.targets.add_verification_key(targets_pubkey)
repository.snapshot.add_verification_key(snapshot_pubkey)

# Verify that repository.writeall() fails for insufficient threshold
# of signatures (default threshold = 1).
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)

repository.timestamp.add_verification_key(timestamp_pubkey)


# (3) Load top-level signing keys.
repository.status()
repository.root.load_signing_key(root_privkey)
repository.status()
repository.targets.load_signing_key(targets_privkey)
repository.status()
repository.snapshot.load_signing_key(snapshot_privkey)
repository.status()

# Verify that repository.writeall() fails for insufficient threshold
# of signatures (default threshold = 1).
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)

repository.timestamp.load_signing_key(timestamp_privkey)

# Add target fileinfo
target1_hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes)
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
target1 = os.path.join(targets_directory, 'file1.txt')
target2 = os.path.join(targets_directory, 'file2.txt')
repository.targets.add_target(target1, fileinfo=target1_fileinfo)
repository.targets.add_target(target2, fileinfo=target2_fileinfo)

repository.writeall(use_existing_fileinfo=True)

# Verify that the expected metadata is written.
metadata_directory = os.path.join(repository_directory,
repo_tool.METADATA_STAGED_DIRECTORY_NAME)

for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
role_filepath = os.path.join(metadata_directory, role)
role_signable = securesystemslib.util.load_json_file(role_filepath)

# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
# an invalid signable.
tuf.formats.check_signable_object_format(role_signable)

self.assertTrue(os.path.exists(role_filepath))



def test_get_filepaths_in_directory(self):
# Test normal case.
Expand Down Expand Up @@ -1076,14 +1178,14 @@ def test_add_target(self):

self.assertEqual(len(self.targets_object.target_files), 2)
self.assertTrue('file2.txt' in self.targets_object.target_files)
self.assertEqual(self.targets_object.target_files['file2.txt'],
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
custom_file_permissions)

# Attempt to replace target that has already been added.
octal_file_permissions2 = oct(os.stat(target_filepath).st_mode)[4:]
custom_file_permissions2 = {'file_permissions': octal_file_permissions}
self.targets_object.add_target(target2_filepath, custom_file_permissions2)
self.assertEqual(self.targets_object.target_files['file2.txt'],
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
custom_file_permissions2)

# Test improperly formatted arguments.
Expand All @@ -1094,18 +1196,6 @@ def test_add_target(self):
target_filepath, 3)


# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target,
'non-existent.txt')

# Not under the repository's targets directory.
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target,
self.temporary_directory)

# Not a file (i.e., a valid path, but a directory.)
test_directory = os.path.join(self.targets_directory, 'test_directory')
os.mkdir(test_directory)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target, test_directory)



Expand Down Expand Up @@ -1134,17 +1224,6 @@ def test_add_targets(self):
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_targets, 3)

# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
['non-existent.txt'])
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[target1_filepath, target2_filepath, 'non-existent.txt'])
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[self.temporary_directory])
temp_directory = os.path.join(self.targets_directory, 'temp')
os.mkdir(temp_directory)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[temp_directory])



Expand Down Expand Up @@ -1288,6 +1367,32 @@ def test_delegate_hashed_bins(self):
public_keys = [public_key]
list_of_targets = [target1_filepath]


# A helper function to check that the range of prefixes the role is
# delegated for, specified in path_hash_prefixes, matches the range
# implied by the bin, or delegation role, name.
def check_prefixes_match_range():
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
'test_repository')
have_prefixes = False

for delegated_role in roleinfo['delegations']['roles']:
if len(delegated_role['path_hash_prefixes']) > 0:
rolename = delegated_role['name']
prefixes = delegated_role['path_hash_prefixes']
have_prefixes = True

if len(prefixes) > 1:
prefix_range = "{}-{}".format(prefixes[0], prefixes[-1])
else:
prefix_range = prefixes[0]

self.assertEqual(rolename, prefix_range)

# We expect at least one delegation with some path_hash_prefixes
self.assertTrue(have_prefixes)


# Test delegate_hashed_bins() and verify that 16 hashed bins have
# been delegated in the parent's roleinfo.
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
Expand All @@ -1299,11 +1404,14 @@ def test_delegate_hashed_bins(self):

self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()),
sorted(delegated_rolenames))
check_prefixes_match_range()

# For testing / coverage purposes, try to create delegated bins that
# hold a range of hash prefixes (e.g., bin name: 000-003).
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
number_of_bins=512)
check_prefixes_match_range()

# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.delegate_hashed_bins, 3, public_keys,
Expand Down Expand Up @@ -1341,21 +1449,20 @@ def test_add_target_to_bin(self):

# Set needed arguments by delegate_hashed_bins().
public_keys = [public_key]
list_of_targets = [target1_filepath]

# Delegate to hashed bins. The target filepath to be tested is expected
# to contain a hash prefix of 'e', and should be available at:
# repository.targets('e').
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
self.targets_object.delegate_hashed_bins([], public_keys,
number_of_bins=16)

# Ensure each hashed bin initially contains zero targets.
for delegation in self.targets_object.delegations:
self.assertTrue(target1_filepath not in delegation.target_files)
self.assertEqual(delegation.target_files, {})

# Add 'target1_filepath' and verify that the relative path of
# 'target1_filepath' is added to the correct bin.
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == '5':
Expand All @@ -1364,55 +1471,37 @@ def test_add_target_to_bin(self):
else:
self.assertFalse('file1.txt' in delegation.target_files)

# Verify that 'path_hash_prefixes' must exist for hashed bin delegations.

roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
repository_name)

for delegated_role in roleinfo['delegations']['roles']:
delegated_role['path_hash_prefixes'] = []

tuf.roledb.update_roleinfo(self.targets_object.rolename, roleinfo,
repository_name=repository_name)
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, target1_filepath)

# Verify that an exception is raised if a target does not match with
# any of the 'path_hash_prefixes'.
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
repository_name)
delegated_role = roleinfo['delegations']['roles'][0]
delegated_role['path_hash_prefixes'] = ['faac']
delegated_roles = list()
delegated_roles.append(delegated_role)
roleinfo['delegations']['roles'] = delegated_roles
tuf.roledb.update_roleinfo(self.targets_object.rolename, roleinfo,
repository_name=repository_name)

self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, target1_filepath)

# Test for non-existent delegations and hashed bins.
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty',
repository_name=repository_name)

self.assertRaises(securesystemslib.exceptions.Error,
empty_targets_role.add_target_to_bin,
target1_filepath)
os.path.basename(target1_filepath), 16)

# Test for a required hashed bin that does not exist.
self.targets_object.revoke('e')
self.targets_object.revoke('5')
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin,
target1_filepath)
os.path.basename(target1_filepath), 16)

# Test adding a target with fileinfo
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')

self.targets_object.add_target_to_bin(os.path.basename(target2_filepath), 16, fileinfo=target2_fileinfo)

for delegation in self.targets_object.delegations:
if delegation.rolename == '0':
self.assertTrue('file2.txt' in delegation.target_files)

else:
self.assertFalse('file2.txt' in delegation.target_files)

# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.add_target_to_bin, 3)

# Invalid target file path argument.
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, '/non-existent')
self.targets_object.add_target_to_bin, 3, 'foo')



Expand All @@ -1426,48 +1515,43 @@ def test_remove_target_from_bin(self):

# Set needed arguments by delegate_hashed_bins().
public_keys = [public_key]
list_of_targets = [os.path.basename(target1_filepath)]

# Delegate to hashed bins. The target filepath to be tested is expected
# to contain a hash prefix of 'e', and can be accessed as:
# repository.targets('e').
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
self.targets_object.delegate_hashed_bins([], public_keys,
number_of_bins=16)

# Ensure each hashed bin initially contains zero targets.
for delegation in self.targets_object.delegations:
self.assertTrue(os.path.basename(target1_filepath) not in delegation.target_files)
self.assertEqual(delegation.target_files, {})

# Add 'target1_filepath' and verify that the relative path of
# 'target1_filepath' is added to the correct bin.
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == '5':
self.assertTrue('file1.txt' in delegation.target_files)

self.assertTrue(len(delegation.target_files) == 1)
else:
self.assertTrue('file1.txt' not in delegation.target_files)

# Test the remove_target_from_bin() method. Verify that 'target1_filepath'
# has been removed.
self.targets_object.remove_target_from_bin(os.path.basename(target1_filepath))
self.targets_object.remove_target_from_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == 'e':
self.assertTrue('file1.txt' not in delegation.target_files)

else:
self.assertTrue('file1.txt' not in delegation.target_files)
self.assertTrue('file1.txt' not in delegation.target_files)


# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.remove_target_from_bin, 3)
self.targets_object.remove_target_from_bin, 3, 'foo')

# Invalid target file path argument.
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.remove_target_from_bin, '/non-existent')
self.targets_object.remove_target_from_bin, '/non-existent', 16)



Expand Down
8 changes: 4 additions & 4 deletions tests/test_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ def test_tutorial(self):
targets, [public_unclaimed_key], 32)

self.assertListEqual([
"Creating hashed bin delegations.",
"1 total targets.",
"32 hashed bins.",
"256 total hash prefixes.",
"Creating hashed bin delegations.\n"
"1 total targets.\n"
"32 hashed bins.\n"
"256 total hash prefixes.\n"
"Each bin ranges over 8 hash prefixes."
] + ["Adding a verification key that has already been used."] * 32,
[
Expand Down
Loading

0 comments on commit e095112

Please sign in to comment.