Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements for hashed bin delegation #1007

Merged
merged 17 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def test_generate_targets_metadata(self):
datetime_object = datetime.datetime(2030, 1, 1, 12, 0)
expiration_date = datetime_object.isoformat() + 'Z'
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
target_files = {'file.txt': {'file_permission': file_permissions}}
target_files = {'file.txt': {'custom': {'file_permission': file_permissions}}}

delegations = {"keys": {
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
Expand Down
240 changes: 162 additions & 78 deletions tests/test_repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,108 @@ def test_writeall(self):
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3)


def test_writeall_no_files(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to give kudos for adding a test case that closely mirrors the current work on Warehouse ❤️

# Test writeall() when using pre-supplied fileinfo

repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
targets_directory = os.path.join(repository_directory,
repo_tool.TARGETS_DIRECTORY_NAME)

repository = repo_tool.create_new_repository(repository_directory, repository_name)

# (1) Load the public and private keys of the top-level roles, and one
# delegated role.
keystore_directory = os.path.join('repository_data', 'keystore')

# Load the public keys.
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')

root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
targets_pubkey = \
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
snapshot_pubkey = \
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
timestamp_pubkey = \
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)

# Load the private keys.
root_privkey_path = os.path.join(keystore_directory, 'root_key')
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')

root_privkey = \
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
targets_privkey = \
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
'password')
snapshot_privkey = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
'password')
timestamp_privkey = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
'password')


# (2) Add top-level verification keys.
repository.root.add_verification_key(root_pubkey)
repository.targets.add_verification_key(targets_pubkey)
repository.snapshot.add_verification_key(snapshot_pubkey)

# Verify that repository.writeall() fails for insufficient threshold
# of signatures (default threshold = 1).
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)

repository.timestamp.add_verification_key(timestamp_pubkey)


# (3) Load top-level signing keys.
repository.status()
repository.root.load_signing_key(root_privkey)
repository.status()
repository.targets.load_signing_key(targets_privkey)
repository.status()
repository.snapshot.load_signing_key(snapshot_privkey)
repository.status()

# Verify that repository.writeall() fails for insufficient threshold
# of signatures (default threshold = 1).
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)

repository.timestamp.load_signing_key(timestamp_privkey)

# Add target fileinfo
target1_hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes)
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
target1 = os.path.join(targets_directory, 'file1.txt')
target2 = os.path.join(targets_directory, 'file2.txt')
repository.targets.add_target(target1, fileinfo=target1_fileinfo)
repository.targets.add_target(target2, fileinfo=target2_fileinfo)

repository.writeall(use_existing_fileinfo=True)

# Verify that the expected metadata is written.
metadata_directory = os.path.join(repository_directory,
repo_tool.METADATA_STAGED_DIRECTORY_NAME)

for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
role_filepath = os.path.join(metadata_directory, role)
role_signable = securesystemslib.util.load_json_file(role_filepath)

# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
# an invalid signable.
tuf.formats.check_signable_object_format(role_signable)

self.assertTrue(os.path.exists(role_filepath))



def test_get_filepaths_in_directory(self):
# Test normal case.
Expand Down Expand Up @@ -1076,14 +1178,14 @@ def test_add_target(self):

self.assertEqual(len(self.targets_object.target_files), 2)
self.assertTrue('file2.txt' in self.targets_object.target_files)
self.assertEqual(self.targets_object.target_files['file2.txt'],
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
custom_file_permissions)

# Attempt to replace target that has already been added.
octal_file_permissions2 = oct(os.stat(target_filepath).st_mode)[4:]
custom_file_permissions2 = {'file_permissions': octal_file_permissions}
self.targets_object.add_target(target2_filepath, custom_file_permissions2)
self.assertEqual(self.targets_object.target_files['file2.txt'],
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
custom_file_permissions2)

# Test improperly formatted arguments.
Expand All @@ -1094,18 +1196,6 @@ def test_add_target(self):
target_filepath, 3)


# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target,
'non-existent.txt')

# Not under the repository's targets directory.
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target,
self.temporary_directory)

# Not a file (i.e., a valid path, but a directory.)
test_directory = os.path.join(self.targets_directory, 'test_directory')
os.mkdir(test_directory)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target, test_directory)



Expand Down Expand Up @@ -1134,17 +1224,6 @@ def test_add_targets(self):
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_targets, 3)

# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
['non-existent.txt'])
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[target1_filepath, target2_filepath, 'non-existent.txt'])
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[self.temporary_directory])
temp_directory = os.path.join(self.targets_directory, 'temp')
os.mkdir(temp_directory)
self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_targets,
[temp_directory])



Expand Down Expand Up @@ -1288,6 +1367,32 @@ def test_delegate_hashed_bins(self):
public_keys = [public_key]
list_of_targets = [target1_filepath]


# A helper function to check that the range of prefixes the role is
# delegated for, specified in path_hash_prefixes, matches the range
# implied by the bin, or delegation role, name.
def check_prefixes_match_range():
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
'test_repository')
have_prefixes = False

for delegated_role in roleinfo['delegations']['roles']:
if len(delegated_role['path_hash_prefixes']) > 0:
joshuagl marked this conversation as resolved.
Show resolved Hide resolved
rolename = delegated_role['name']
prefixes = delegated_role['path_hash_prefixes']
have_prefixes = True

if len(prefixes) > 1:
prefix_range = "{}-{}".format(prefixes[0], prefixes[-1])
else:
prefix_range = prefixes[0]

self.assertEqual(rolename, prefix_range)

# We expect at least one delegation with some path_hash_prefixes
self.assertTrue(have_prefixes)


# Test delegate_hashed_bins() and verify that 16 hashed bins have
# been delegated in the parent's roleinfo.
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
Expand All @@ -1299,11 +1404,14 @@ def test_delegate_hashed_bins(self):

self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()),
sorted(delegated_rolenames))
check_prefixes_match_range()

# For testing / coverage purposes, try to create delegated bins that
# hold a range of hash prefixes (e.g., bin name: 000-003).
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
number_of_bins=512)
check_prefixes_match_range()

# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.delegate_hashed_bins, 3, public_keys,
Expand Down Expand Up @@ -1341,21 +1449,20 @@ def test_add_target_to_bin(self):

# Set needed arguments by delegate_hashed_bins().
public_keys = [public_key]
list_of_targets = [target1_filepath]

# Delegate to hashed bins. The target filepath to be tested is expected
# to contain a hash prefix of 'e', and should be available at:
# repository.targets('e').
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
self.targets_object.delegate_hashed_bins([], public_keys,
number_of_bins=16)

# Ensure each hashed bin initially contains zero targets.
for delegation in self.targets_object.delegations:
self.assertTrue(target1_filepath not in delegation.target_files)
self.assertEqual(delegation.target_files, {})

# Add 'target1_filepath' and verify that the relative path of
# 'target1_filepath' is added to the correct bin.
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == '5':
Expand All @@ -1364,55 +1471,37 @@ def test_add_target_to_bin(self):
else:
self.assertFalse('file1.txt' in delegation.target_files)

# Verify that 'path_hash_prefixes' must exist for hashed bin delegations.

roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
repository_name)

for delegated_role in roleinfo['delegations']['roles']:
delegated_role['path_hash_prefixes'] = []

tuf.roledb.update_roleinfo(self.targets_object.rolename, roleinfo,
repository_name=repository_name)
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, target1_filepath)

# Verify that an exception is raised if a target does not match with
# any of the 'path_hash_prefixes'.
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
repository_name)
delegated_role = roleinfo['delegations']['roles'][0]
delegated_role['path_hash_prefixes'] = ['faac']
delegated_roles = list()
delegated_roles.append(delegated_role)
roleinfo['delegations']['roles'] = delegated_roles
tuf.roledb.update_roleinfo(self.targets_object.rolename, roleinfo,
repository_name=repository_name)

self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, target1_filepath)

# Test for non-existent delegations and hashed bins.
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty',
repository_name=repository_name)

self.assertRaises(securesystemslib.exceptions.Error,
empty_targets_role.add_target_to_bin,
target1_filepath)
os.path.basename(target1_filepath), 16)

# Test for a required hashed bin that does not exist.
self.targets_object.revoke('e')
self.targets_object.revoke('5')
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin,
target1_filepath)
os.path.basename(target1_filepath), 16)

# Test adding a target with fileinfo
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')

self.targets_object.add_target_to_bin(os.path.basename(target2_filepath), 16, fileinfo=target2_fileinfo)

for delegation in self.targets_object.delegations:
if delegation.rolename == '0':
self.assertTrue('file2.txt' in delegation.target_files)

else:
self.assertFalse('file2.txt' in delegation.target_files)

# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.add_target_to_bin, 3)

# Invalid target file path argument.
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.add_target_to_bin, '/non-existent')
self.targets_object.add_target_to_bin, 3, 'foo')



Expand All @@ -1426,48 +1515,43 @@ def test_remove_target_from_bin(self):

# Set needed arguments by delegate_hashed_bins().
public_keys = [public_key]
list_of_targets = [os.path.basename(target1_filepath)]

# Delegate to hashed bins. The target filepath to be tested is expected
# to contain a hash prefix of 'e', and can be accessed as:
# repository.targets('e').
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
self.targets_object.delegate_hashed_bins([], public_keys,
number_of_bins=16)

# Ensure each hashed bin initially contains zero targets.
for delegation in self.targets_object.delegations:
self.assertTrue(os.path.basename(target1_filepath) not in delegation.target_files)
self.assertEqual(delegation.target_files, {})

# Add 'target1_filepath' and verify that the relative path of
# 'target1_filepath' is added to the correct bin.
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == '5':
self.assertTrue('file1.txt' in delegation.target_files)

self.assertTrue(len(delegation.target_files) == 1)
else:
self.assertTrue('file1.txt' not in delegation.target_files)

# Test the remove_target_from_bin() method. Verify that 'target1_filepath'
# has been removed.
self.targets_object.remove_target_from_bin(os.path.basename(target1_filepath))
self.targets_object.remove_target_from_bin(os.path.basename(target1_filepath), 16)

for delegation in self.targets_object.delegations:
if delegation.rolename == 'e':
self.assertTrue('file1.txt' not in delegation.target_files)

else:
self.assertTrue('file1.txt' not in delegation.target_files)
self.assertTrue('file1.txt' not in delegation.target_files)


# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
self.targets_object.remove_target_from_bin, 3)
self.targets_object.remove_target_from_bin, 3, 'foo')

# Invalid target file path argument.
self.assertRaises(securesystemslib.exceptions.Error,
self.targets_object.remove_target_from_bin, '/non-existent')
self.targets_object.remove_target_from_bin, '/non-existent', 16)



Expand Down
8 changes: 4 additions & 4 deletions tests/test_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ def test_tutorial(self):
targets, [public_unclaimed_key], 32)

self.assertListEqual([
"Creating hashed bin delegations.",
"1 total targets.",
"32 hashed bins.",
"256 total hash prefixes.",
"Creating hashed bin delegations.\n"
"1 total targets.\n"
"32 hashed bins.\n"
"256 total hash prefixes.\n"
"Each bin ranges over 8 hash prefixes."
] + ["Adding a verification key that has already been used."] * 32,
[
Expand Down
Loading