diff --git a/tests/test_api.py b/tests/test_api.py index f309635376..f80faae2f3 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -39,6 +39,7 @@ ) from tuf.api.serialization import DeserializationError from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer +from tuf.api.metadata import Rolename logger = logging.getLogger(__name__) @@ -68,7 +69,7 @@ def setUpClass(cls): # Load keys into memory cls.keystore = {} - for role in ["delegation", "snapshot", "targets", "timestamp"]: + for role in ["delegation", Rolename.snapshot, Rolename.targets, Rolename.timestamp]: cls.keystore[role] = import_ed25519_privatekey_from_file( os.path.join(cls.keystore_dir, role + "_key"), password="password", @@ -82,10 +83,10 @@ def tearDownClass(cls): def test_generic_read(self): for metadata, inner_metadata_cls in [ - ("root", Root), - ("snapshot", Snapshot), - ("timestamp", Timestamp), - ("targets", Targets), + (Rolename.root, Root), + (Rolename.snapshot, Snapshot), + (Rolename.timestamp, Timestamp), + (Rolename.targets, Targets), ]: # Load JSON-formatted metdata of each supported type from file @@ -126,7 +127,7 @@ def test_compact_json(self): ) def test_read_write_read_compare(self): - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Rolename.root, Rolename.snapshot, Rolename.timestamp, Rolename.targets]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") md_obj = Metadata.from_file(path) @@ -138,7 +139,7 @@ def test_read_write_read_compare(self): os.remove(path_2) def test_to_from_bytes(self): - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Rolename.root, Rolename.snapshot, Rolename.timestamp, Rolename.targets]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() @@ -159,11 +160,11 @@ def test_sign_verify(self): root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root - targets_keyid = next(iter(root.roles["targets"].keyids)) + targets_keyid = next(iter(root.roles[Rolename.targets].keyids)) targets_key = root.keys[targets_keyid] - snapshot_keyid = next(iter(root.roles["snapshot"].keyids)) + snapshot_keyid = next(iter(root.roles[Rolename.snapshot].keyids)) snapshot_key = root.keys[snapshot_keyid] - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Rolename.timestamp].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... @@ -182,7 +183,7 @@ def test_sign_verify(self): with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(md_obj, JSONSerializer()) - sslib_signer = SSlibSigner(self.keystore["snapshot"]) + sslib_signer = SSlibSigner(self.keystore[Rolename.snapshot]) # Append a new signature with the unrelated key and assert that ... sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -193,7 +194,7 @@ def test_sign_verify(self): # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore["timestamp"]) + sslib_signer = SSlibSigner(self.keystore[Rolename.timestamp]) # Create and assign (don't append) a new signature and assert that ... md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -208,7 +209,7 @@ def test_verify_failures(self): root = Metadata[Root].from_file(root_path).signed # Locate the timestamp public key we need from root - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Rolename.timestamp].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (timestamp) @@ -359,20 +360,20 @@ def test_metadata_verify_delegate(self): role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate("root", root) - root.verify_delegate("snapshot", snapshot) - root.verify_delegate("targets", targets) + root.verify_delegate(Rolename.root, root) + root.verify_delegate(Rolename.snapshot, snapshot) + root.verify_delegate(Rolename.targets, targets) targets.verify_delegate("role1", role1) role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate("snapshot", snapshot) + snapshot.verify_delegate(Rolename.snapshot, snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate("targets", targets) + targets.verify_delegate(Rolename.targets, targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): role2.verify_delegate("role1", role1) @@ -381,31 +382,31 @@ def test_metadata_verify_delegate(self): expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Rolename.snapshot, snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("timestamp", snapshot) + root.verify_delegate(Rolename.timestamp, snapshot) # Add a key to snapshot role, make sure the new sig fails to verify - ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) - root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) + ts_keyid = next(iter(root.signed.roles[Rolename.timestamp].keyids)) + root.signed.add_key(Rolename.snapshot, root.signed.keys[ts_keyid]) snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Rolename.snapshot, snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles["snapshot"].threshold = 2 + root.signed.roles[Rolename.snapshot].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Rolename.snapshot, snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) - root.verify_delegate("snapshot", snapshot) + snapshot.sign(SSlibSigner(self.keystore[Rolename.timestamp]), append=True) + root.verify_delegate(Rolename.snapshot, snapshot) def test_key_class(self): # Test if from_securesystemslib_key removes the private key from keyval @@ -431,14 +432,14 @@ def test_root_add_key_and_remove_key(self): ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + self.assertNotIn(keyid, root.signed.roles[Rolename.root].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key("root", key_metadata) + root.signed.add_key(Rolename.root, key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles["root"].keyids) + self.assertIn(keyid, root.signed.roles[Rolename.root].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -446,29 +447,29 @@ def test_root_add_key_and_remove_key(self): root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles["root"].keyids.copy() - root.signed.add_key("root", key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) + pre_add_keyid = root.signed.roles[Rolename.root].keyids.copy() + root.signed.add_key(Rolename.root, key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles[Rolename.root].keyids) # Add the same key to targets role as well - root.signed.add_key("targets", key_metadata) + root.signed.add_key(Rolename.targets, key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key("root", keyid) - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + root.signed.remove_key(Rolename.root, keyid) + self.assertNotIn(keyid, root.signed.roles[Rolename.root].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key("targets", keyid) - self.assertNotIn(keyid, root.signed.roles["targets"].keyids) + root.signed.remove_key(Rolename.targets, keyid) + self.assertNotIn(keyid, root.signed.roles[Rolename.targets].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key("root", "nosuchkey") + root.signed.remove_key(Rolename.root, "nosuchkey") with self.assertRaises(ValueError): root.signed.remove_key("nosuchrole", keyid) @@ -670,7 +671,7 @@ def test_length_and_hash_validation(self): targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) file1_targetfile = targets.signed.targets["file1.txt"] - filepath = os.path.join(self.repo_dir, "targets", "file1.txt") + filepath = os.path.join(self.repo_dir, Rolename.targets, "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -694,7 +695,7 @@ def test_length_and_hash_validation(self): def test_targetfile_from_file(self): # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Rolename.targets, "file1.txt") targetfile_from_file = TargetFile.from_file( file_path, file_path, ["sha256"] ) @@ -703,7 +704,7 @@ def test_targetfile_from_file(self): targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, "targets", "file123.txt") + file_path = os.path.join(self.repo_dir, Rolename.targets, "file123.txt") self.assertRaises( FileNotFoundError, TargetFile.from_file, @@ -713,7 +714,7 @@ def test_targetfile_from_file(self): ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Rolename.targets, "file1.txt") self.assertRaises( exceptions.UnsupportedAlgorithmError, TargetFile.from_file, @@ -724,7 +725,7 @@ def test_targetfile_from_file(self): def test_targetfile_from_data(self): data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + target_file_path = os.path.join(self.repo_dir, Rolename.targets, "file1.txt") # Test with a valid hash algorithm targetfile_from_data = TargetFile.from_data( diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index be0333c351..f881d0db37 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -35,6 +35,7 @@ import tuf.roledb import tuf.keydb import tuf.repository_tool as repo_tool +from tuf.api.metadata import Rolename from tests import utils @@ -722,7 +723,7 @@ def test_signature_order(self): "password")) # Write root metadata with two signatures - repo.write("root") + repo.write(Rolename.root) # Load signed and written json metadata back into memory root_metadata_path = os.path.join( diff --git a/tests/test_sig.py b/tests/test_sig.py index a49c59c21c..9d32dd1152 100755 --- a/tests/test_sig.py +++ b/tests/test_sig.py @@ -33,6 +33,7 @@ import tuf.roledb import tuf.sig import tuf.exceptions +from tuf.api.metadata import Rolename from tests import utils @@ -399,12 +400,12 @@ def test_verify_must_not_count_duplicate_keyids_towards_threshold(self): # Assert that 'get_signature_status' returns two good signatures ... status = tuf.sig.get_signature_status( - signable, "root", keyids=[KEYS[0]["keyid"]], threshold=2) + signable, Rolename.root, keyids=[KEYS[0]["keyid"]], threshold=2) self.assertTrue(len(status["good_sigs"]) == 2) # ... but only one counts towards the threshold self.assertFalse( - tuf.sig.verify(signable, "root", keyids=[KEYS[0]["keyid"]], threshold=2)) + tuf.sig.verify(signable, Rolename.root, keyids=[KEYS[0]["keyid"]], threshold=2)) # Clean-up keydb tuf.keydb.remove_key(KEYS[0]["keyid"]) @@ -434,7 +435,7 @@ def test_verify_count_different_keyids_for_same_key_towards_threshold(self): # Assert that the key only counts toward the threshold once keyids = [key_sha256["keyid"], key_sha512["keyid"]] self.assertFalse( - tuf.sig.verify(signable, "root", keyids=keyids, threshold=2)) + tuf.sig.verify(signable, Rolename.root, keyids=keyids, threshold=2)) # Clean-up keydb tuf.keydb.remove_key(key_sha256["keyid"]) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 52a28ebb84..f2377c0019 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -21,6 +21,7 @@ Snapshot, Targets, Timestamp, + Rolename, ) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet @@ -51,10 +52,10 @@ def setUpClass(cls): ) cls.metadata = {} for md in [ - "root", - "timestamp", - "snapshot", - "targets", + Rolename.root, + Rolename.timestamp, + Rolename.snapshot, + Rolename.targets, "role1", "role2", ]: @@ -64,10 +65,10 @@ def setUpClass(cls): keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + "_key"), password="password" + os.path.join(keystore_dir, Rolename.root + "_key"), password="password" ) - cls.keystore["root"] = SSlibSigner(root_key_dict) - for role in ["delegation", "snapshot", "targets", "timestamp"]: + cls.keystore[Rolename.root] = SSlibSigner(root_key_dict) + for role in ["delegation", Rolename.snapshot, Rolename.targets, Rolename.timestamp]: key_dict = import_ed25519_privatekey_from_file( os.path.join(keystore_dir, role + "_key"), password="password" ) @@ -77,12 +78,12 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.hashes = None timestamp.snapshot_meta.length = None - cls.metadata["timestamp"] = cls.modify_metadata( - cls, "timestamp", hashes_length_modifier + cls.metadata[Rolename.timestamp] = cls.modify_metadata( + cls, Rolename.timestamp, hashes_length_modifier ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata["root"]) + self.trusted_set = TrustedMetadataSet(self.metadata[Rolename.root]) def _update_all_besides_targets( self, @@ -94,24 +95,24 @@ def _update_all_besides_targets( Args: timestamp_bytes: Bytes used when calling trusted_set.update_timestamp(). - Default self.metadata["timestamp"]. + Default self.metadata[Rolename.timestamp]. snapshot_bytes: Bytes used when calling trusted_set.update_snapshot(). - Default self.metadata["snapshot"]. + Default self.metadata[Rolename.snapshot]. """ - timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] + timestamp_bytes = timestamp_bytes or self.metadata[Rolename.timestamp] self.trusted_set.update_timestamp(timestamp_bytes) - snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] + snapshot_bytes = snapshot_bytes or self.metadata[Rolename.snapshot] self.trusted_set.update_snapshot(snapshot_bytes) def test_update(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Rolename.targets ) self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" @@ -129,38 +130,38 @@ def test_update(self): def test_out_of_order_ops(self): # Update snapshot before timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) # Update root after timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Rolename.root]) # Update targets before snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) # update timestamp after snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) # Update delegated targets before targets with self.assertRaises(RuntimeError): self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Rolename.targets ) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) # Update snapshot after sucessful targets update with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Rolename.targets ) def test_root_with_invalid_json(self): @@ -171,20 +172,20 @@ def test_root_with_invalid_json(self): test_func(b"") # root is invalid - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Rolename.root]) root.signed.version += 1 with self.assertRaises(exceptions.UnsignedMetadataError): test_func(root.to_bytes()) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata["snapshot"]) + test_func(self.metadata[Rolename.snapshot]) def test_top_level_md_with_invalid_json(self): top_level_md = [ - (self.metadata["timestamp"], self.trusted_set.update_timestamp), - (self.metadata["snapshot"], self.trusted_set.update_snapshot), - (self.metadata["targets"], self.trusted_set.update_targets), + (self.metadata[Rolename.timestamp], self.trusted_set.update_timestamp), + (self.metadata[Rolename.snapshot], self.trusted_set.update_snapshot), + (self.metadata[Rolename.targets], self.trusted_set.update_targets), ] for metadata, update_func in top_level_md: md = Metadata.from_bytes(metadata) @@ -199,7 +200,7 @@ def test_top_level_md_with_invalid_json(self): # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(self.metadata["root"]) + update_func(self.metadata[Rolename.root]) update_func(metadata) @@ -208,53 +209,53 @@ def test_update_root_new_root(self): def root_new_version_modifier(root: Root) -> None: root.version += 1 - root = self.modify_metadata("root", root_new_version_modifier) + root = self.modify_metadata(Rolename.root, root_new_version_modifier) self.trusted_set.update_root(root) def test_update_root_new_root_cannot_be_verified_with_threshold(self): # new_root data with threshold which cannot be verified. - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Rolename.root]) # remove root role keyids representing root signatures - root.signed.roles["root"].keyids = [] + root.signed.roles[Rolename.root].keyids = [] with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_root(root.to_bytes()) def test_update_root_new_root_ver_same_as_trusted_root_ver(self): with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Rolename.root]) def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) # intermediate root can be expired - root = self.modify_metadata("root", root_expired_modifier) + root = self.modify_metadata(Rolename.root, root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) + tmp_trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - timestamp = self.modify_metadata("timestamp", version_modifier) + timestamp = self.modify_metadata(Rolename.timestamp, version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) def test_update_timestamp_snapshot_ver_below_current(self): def bump_snapshot_version(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 # set current known snapshot.json version to 2 - timestamp = self.modify_metadata("timestamp", bump_snapshot_version) + timestamp = self.modify_metadata(Rolename.timestamp, bump_snapshot_version) self.trusted_set.update_timestamp(timestamp) # newtimestamp.meta.version < trusted_timestamp.meta.version with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) def test_update_timestamp_expired(self): # new_timestamp has expired @@ -263,29 +264,29 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: # expired intermediate timestamp is loaded but raises timestamp = self.modify_metadata( - "timestamp", timestamp_expired_modifier + Rolename.timestamp, timestamp_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) # snapshot update does start but fails because timestamp is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) def test_update_snapshot_length_or_hash_mismatch(self): def modify_snapshot_length(timestamp: Timestamp) -> None: timestamp.snapshot_meta.length = 1 # set known snapshot.json length to 1 - timestamp = self.modify_metadata("timestamp", modify_snapshot_length) + timestamp = self.modify_metadata(Rolename.timestamp, modify_snapshot_length) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) + snapshot = Metadata.from_bytes(self.metadata[Rolename.snapshot]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) @@ -295,55 +296,55 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 timestamp = self.modify_metadata( - "timestamp", timestamp_version_modifier + Rolename.timestamp, timestamp_version_modifier ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) # targets update starts but fails if snapshot version does not match with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_snapshot_file_removed_from_meta(self): - self._update_all_besides_targets(self.metadata["timestamp"]) + self._update_all_besides_targets(self.metadata[Rolename.timestamp]) def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] # Test removing a meta_file in new_snapshot compared to the old snapshot - snapshot = self.modify_metadata("snapshot", remove_file_from_meta) + snapshot = self.modify_metadata(Rolename.snapshot, remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_meta_version_decreases(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) def version_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta["targets.json"].version += 1 - snapshot = self.modify_metadata("snapshot", version_meta_modifier) + snapshot = self.modify_metadata(Rolename.snapshot, version_meta_modifier) self.trusted_set.update_snapshot(snapshot) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) def test_update_snapshot_expired_new_snapshot(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) # expired intermediate snapshot is loaded but will raise - snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) + snapshot = self.modify_metadata(Rolename.snapshot, snapshot_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(snapshot) # targets update does start but fails because snapshot is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_snapshot_successful_rollback_checks(self): def meta_version_bump(timestamp: Timestamp) -> None: @@ -353,51 +354,51 @@ def version_bump(snapshot: Snapshot) -> None: snapshot.version += 1 # load a "local" timestamp, then update to newer one: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - new_timestamp = self.modify_metadata("timestamp", meta_version_bump) + self.trusted_set.update_timestamp(self.metadata[Rolename.timestamp]) + new_timestamp = self.modify_metadata(Rolename.timestamp, meta_version_bump) self.trusted_set.update_timestamp(new_timestamp) # load a "local" snapshot with mismatching version (loading happens but # BadVersionNumberError is raised), then update to newer one: with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - new_snapshot = self.modify_metadata("snapshot", version_bump) + self.trusted_set.update_snapshot(self.metadata[Rolename.snapshot]) + new_snapshot = self.modify_metadata(Rolename.snapshot, version_bump) self.trusted_set.update_snapshot(new_snapshot) # update targets to trigger final snapshot meta version check - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_targets_no_meta_in_snapshot(self): def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} - snapshot = self.modify_metadata("snapshot", no_meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Rolename.snapshot, no_meta_modifier) + self._update_all_besides_targets(self.metadata[Rolename.timestamp], snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) - snapshot = self.modify_metadata("snapshot", meta_length_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Rolename.snapshot, meta_length_modifier) + self._update_all_besides_targets(self.metadata[Rolename.timestamp], snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_targets_version_different_snapshot_meta_version(self): def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) - snapshot = self.modify_metadata("snapshot", meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Rolename.snapshot, meta_modifier) + self._update_all_besides_targets(self.metadata[Rolename.timestamp], snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Rolename.targets]) def test_update_targets_expired_new_target(self): self._update_all_besides_targets() @@ -405,7 +406,7 @@ def test_update_targets_expired_new_target(self): def target_expired_modifier(target: Targets) -> None: target.expires = datetime(1970, 1, 1) - targets = self.modify_metadata("targets", target_expired_modifier) + targets = self.modify_metadata(Rolename.targets, target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(targets) diff --git a/tests/test_updater.py b/tests/test_updater.py index 9f79fd8b12..85ccd70d1e 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -63,6 +63,7 @@ import tuf.repository_lib as repo_lib import tuf.unittest_toolbox as unittest_toolbox import tuf.client.updater as updater +from tuf.api.metadata import Rolename from tests import utils @@ -417,7 +418,7 @@ def test_1__refresh_must_not_count_duplicate_keyids_towards_threshold(self): # same key and write to disk '2.root.json' and '3.root.json'. for version in [2, 3]: repository.root.version = version - info = tuf.roledb.get_roleinfo("root") + info = tuf.roledb.get_roleinfo(Rolename.root) metadata = repo_lib.generate_root_metadata( info["version"], info["expires"], False) signed_metadata = repo_lib.sign_metadata( @@ -447,7 +448,7 @@ def test_1__refresh_must_not_count_duplicate_keyids_towards_threshold(self): securesystemslib.exceptions.BadSignatureError)) self.assertEqual( str(mirror_errors[0]), - repr("root") + " metadata has bad signature.") + repr(Rolename.root) + " metadata has bad signature.") else: self.fail( diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index c1cc7f4ba9..7321f686ce 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -17,7 +17,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset -from tuf.api.metadata import Key +from tuf.api.metadata import Key, Rolename from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -174,14 +174,14 @@ def test_root_rotation(self, root_versions: List[RootVersion]) -> None: # Publish all remote root versions defined in root_versions for rootver in root_versions: # clear root keys, signers - self.sim.root.roles["root"].keyids.clear() - self.sim.signers["root"].clear() + self.sim.root.roles[Rolename.root].keyids.clear() + self.sim.signers[Rolename.root].clear() - self.sim.root.roles["root"].threshold = rootver.threshold + self.sim.root.roles[Rolename.root].threshold = rootver.threshold for i in rootver.keys: - self.sim.root.add_key("root", self.keys[i]) + self.sim.root.add_key(Rolename.root, self.keys[i]) for i in rootver.signatures: - self.sim.add_signer("root", self.signers[i]) + self.sim.add_signer(Rolename.root, self.signers[i]) self.sim.root.version += 1 self.sim.publish_root() diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index a96ad9e6ba..9b867b1293 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -20,7 +20,7 @@ import tuf.unittest_toolbox as unittest_toolbox from tests import utils from tuf import exceptions, ngclient -from tuf.api.metadata import Metadata, TargetFile +from tuf.api.metadata import Metadata, TargetFile, Rolename logger = logging.getLogger(__name__) @@ -142,10 +142,10 @@ def _create_consistent_target( """ consistent_target_name = f"{target_hash}.{targetname}" source_path = os.path.join( - self.repository_directory, "targets", targetname + self.repository_directory, Rolename.targets, targetname ) destination_path = os.path.join( - self.repository_directory, "targets", consistent_target_name + self.repository_directory, Rolename.targets, consistent_target_name ) shutil.copy(source_path, destination_path) @@ -237,15 +237,15 @@ def test_refresh_and_download(self): # top-level metadata is in local directory already self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Rolename.root, Rolename.snapshot, Rolename.targets, Rolename.timestamp]) # Get targetinfos, assert that cache does not contain files info1 = self.updater.get_targetinfo("file1.txt") - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Rolename.root, Rolename.snapshot, Rolename.targets, Rolename.timestamp]) # Get targetinfo for 'file3.txt' listed in the delegated role1 info3 = self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Rolename.root, Rolename.snapshot, Rolename.targets, Rolename.timestamp] self._assert_files(expected_files) self.assertIsNone(self.updater.find_cached_target(info1)) self.assertIsNone(self.updater.find_cached_target(info3)) @@ -269,14 +269,14 @@ def test_refresh_with_only_local_root(self): os.remove(os.path.join(self.client_directory, "role1.json")) os.remove(os.path.join(self.client_directory, "role2.json")) os.remove(os.path.join(self.client_directory, "1.root.json")) - self._assert_files(["root"]) + self._assert_files([Rolename.root]) self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Rolename.root, Rolename.snapshot, Rolename.targets, Rolename.timestamp]) # Get targetinfo for 'file3.txt' listed in the delegated role1 targetinfo3 = self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Rolename.root, Rolename.snapshot, Rolename.targets, Rolename.timestamp] self._assert_files(expected_files) def test_both_target_urls_not_set(self): diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 2dbb973979..2dbd410e4f 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -14,7 +14,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata +from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata, Rolename from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -34,7 +34,7 @@ class TestRefresh(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") - self.targets_dir = os.path.join(self.temp_dir.name, "targets") + self.targets_dir = os.path.join(self.temp_dir.name, Rolename.targets) os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) @@ -92,7 +92,7 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None: def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - self._assert_files_exist(["root"]) + self._assert_files_exist([Rolename.root]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -104,7 +104,7 @@ def test_first_time_refresh(self) -> None: self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) for role in TOP_LEVEL_ROLE_NAMES: - version = 2 if role == "root" else None + version = 2 if role == Rolename.root else None self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: @@ -127,8 +127,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -141,7 +141,7 @@ def test_trusted_root_expired(self) -> None: # Root is successfully updated to latest version self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Rolename.root, 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -154,7 +154,7 @@ def test_trusted_root_unsigned(self) -> None: self._run_refresh() # The update failed, no changes in metadata - self._assert_files_exist(["root"]) + self._assert_files_exist([Rolename.root]) md_root_after = Metadata.from_file(root_path) self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) @@ -179,7 +179,7 @@ def test_max_root_rotations(self) -> None: # Assert that root version was increased with no more # than 'max_root_rotations' self._assert_version_equals( - "root", initial_root_version + updater.config.max_root_rotations + Rolename.root, initial_root_version + updater.config.max_root_rotations ) def test_intermediate_root_incorrectly_signed(self) -> None: @@ -187,13 +187,13 @@ def test_intermediate_root_incorrectly_signed(self) -> None: # Intermediate root v2 is unsigned self.sim.root.version += 1 - root_signers = self.sim.signers["root"].copy() - self.sim.signers["root"].clear() + root_signers = self.sim.signers[Rolename.root].copy() + self.sim.signers[Rolename.root].clear() self.sim.publish_root() # Final root v3 is correctly signed self.sim.root.version += 1 - self.sim.signers["root"] = root_signers + self.sim.signers[Rolename.root] = root_signers self.sim.publish_root() # Incorrectly signed intermediate root is detected @@ -201,8 +201,8 @@ def test_intermediate_root_incorrectly_signed(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 1) def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file @@ -222,20 +222,20 @@ def test_intermediate_root_expired(self) -> None: # Successfully updated to root v3 self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Rolename.root, 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack self.sim.root.version += 1 # root v2 - self.sim.signers["root"].clear() + self.sim.signers[Rolename.root].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 1) def test_new_root_same_version(self) -> None: # Check for a rollback_attack @@ -245,8 +245,8 @@ def test_new_root_same_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 1) def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version @@ -256,8 +256,8 @@ def test_new_root_nonconsecutive_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 1) def test_final_root_expired(self) -> None: # Check for a freeze attack @@ -270,16 +270,16 @@ def test_final_root_expired(self) -> None: self._run_refresh() # The update failed but final root is persisted on the file system - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Rolename.root]) + self._assert_content_equals(Rolename.root, 2) def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["timestamp"].clear() + self.sim.signers[Rolename.timestamp].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Rolename.root]) def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack @@ -290,7 +290,7 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Rolename.timestamp, 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. @@ -305,7 +305,7 @@ def test_new_timestamp_snapshot_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Rolename.timestamp, 2) def test_new_timestamp_expired(self) -> None: # Check for a freeze attack @@ -315,7 +315,7 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Rolename.root]) def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -336,16 +336,16 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("timestamp", 3) - self._assert_version_equals("snapshot", 1) + self._assert_version_equals(Rolename.timestamp, 3) + self._assert_version_equals(Rolename.snapshot, 1) def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["snapshot"].clear() + self.sim.signers[Rolename.snapshot].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp]) def test_new_snapshot_version_mismatch(self): # Check against timestamp role’s snapshot version @@ -355,7 +355,7 @@ def test_new_snapshot_version_mismatch(self): with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp]) def test_new_snapshot_version_rollback(self) -> None: # Check for a rollback attack @@ -369,7 +369,7 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("snapshot", 2) + self._assert_version_equals(Rolename.snapshot, 2) def test_new_snapshot_expired(self) -> None: # Check for a freeze attack @@ -379,7 +379,7 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp]) def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -401,16 +401,16 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("snapshot", 3) - self._assert_version_equals("targets", 1) + self._assert_version_equals(Rolename.snapshot, 3) + self._assert_version_equals(Rolename.targets, 1) def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["targets"].clear() + self.sim.signers[Rolename.targets].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp, Rolename.snapshot]) def test_new_targets_version_mismatch(self): # Check against snapshot role’s targets version @@ -420,7 +420,7 @@ def test_new_targets_version_mismatch(self): with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp, Rolename.snapshot]) def test_new_targets_expired(self) -> None: # Check for a freeze attack. @@ -430,7 +430,7 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Rolename.root, Rolename.timestamp, Rolename.snapshot]) if __name__ == "__main__":