diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index ec8a899d5f..cf29b16aae 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -143,7 +143,7 @@ def targets(self) -> Targets: def all_targets(self) -> Iterator[Tuple[str, Targets]]: """Yield role name and signed portion of targets one by one.""" - yield "targets", self.md_targets.signed + yield Targets.type, self.md_targets.signed for role, md in self.md_delegates.items(): yield role, md.signed @@ -185,7 +185,7 @@ def _initialize(self): def publish_root(self): """Sign and store a new serialized version of root.""" self.md_root.signatures.clear() - for signer in self.signers["root"].values(): + for signer in self.signers[Root.type].values(): self.md_root.sign(signer, append=True) self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) @@ -201,8 +201,8 @@ def fetch(self, url: str) -> Iterator[bytes]: ver_and_name = path[len("/metadata/") :][: -len(".json")] version, _, role = ver_and_name.partition(".") # root is always version-prefixed while timestamp is always NOT - if role == "root" or ( - self.root.consistent_snapshot and ver_and_name != "timestamp" + if role == Root.type or ( + self.root.consistent_snapshot and ver_and_name != Timestamp.type ): version = int(version) else: @@ -253,7 +253,7 @@ def _fetch_metadata( If version is None, non-versioned metadata is being requested. """ - if role == "root": + if role == Root.type: # return a version previously serialized in publish_root() if version is None or version > len(self.signed_roots): raise FetcherHTTPError(f"Unknown root version {version}", 404) @@ -261,11 +261,11 @@ def _fetch_metadata( return self.signed_roots[version - 1] # sign and serialize the requested metadata - if role == "timestamp": + if role == Timestamp.type: md: Metadata = self.md_timestamp - elif role == "snapshot": + elif role == Snapshot.type: md = self.md_snapshot - elif role == "targets": + elif role == Targets.type: md = self.md_targets else: md = self.md_delegates.get(role) @@ -301,7 +301,7 @@ def update_timestamp(self): self.timestamp.snapshot_meta.version = self.snapshot.version if self.compute_metafile_hashes_length: - hashes, length = self._compute_hashes_and_length("snapshot") + hashes, length = self._compute_hashes_and_length(Snapshot.type) self.timestamp.snapshot_meta.hashes = hashes self.timestamp.snapshot_meta.length = length @@ -324,7 +324,7 @@ def update_snapshot(self): def add_target(self, role: str, data: bytes, path: str): """Create a target from data and add it to the target_files.""" - if role == "targets": + if role == Targets.type: targets = self.targets else: targets = self.md_delegates[role].signed @@ -343,7 +343,7 @@ def add_delegation( hash_prefixes: Optional[List[str]], ): """Add delegated target role to the repository.""" - if delegator_name == "targets": + if delegator_name == Targets.type: delegator = self.targets else: delegator = self.md_delegates[delegator_name].signed @@ -379,9 +379,9 @@ def write(self): for ver in range(1, len(self.signed_roots) + 1): with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f: - f.write(self._fetch_metadata("root", ver)) + f.write(self._fetch_metadata(Root.type, ver)) - for role in ["timestamp", "snapshot", "targets"]: + for role in [Timestamp.type, Snapshot.type, Targets.type]: with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) diff --git a/tests/test_api.py b/tests/test_api.py index 401d061867..2c0682c70a 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -71,7 +71,7 @@ def setUpClass(cls): # Load keys into memory cls.keystore = {} - for role in ["delegation", "snapshot", "targets", "timestamp"]: + for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: cls.keystore[role] = import_ed25519_privatekey_from_file( os.path.join(cls.keystore_dir, role + "_key"), password="password", @@ -85,10 +85,10 @@ def tearDownClass(cls): def test_generic_read(self): for metadata, inner_metadata_cls in [ - ("root", Root), - ("snapshot", Snapshot), - ("timestamp", Timestamp), - ("targets", Targets), + (Root.type, Root), + (Snapshot.type, Snapshot), + (Timestamp.type, Timestamp), + (Targets.type, Targets), ]: # Load JSON-formatted metdata of each supported type from file @@ -129,7 +129,7 @@ def test_compact_json(self): ) def test_read_write_read_compare(self): - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") md_obj = Metadata.from_file(path) @@ -141,7 +141,7 @@ def test_read_write_read_compare(self): os.remove(path_2) def test_to_from_bytes(self): - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() @@ -162,11 +162,11 @@ def test_sign_verify(self): root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root - targets_keyid = next(iter(root.roles["targets"].keyids)) + targets_keyid = next(iter(root.roles[Targets.type].keyids)) targets_key = root.keys[targets_keyid] - snapshot_keyid = next(iter(root.roles["snapshot"].keyids)) + snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids)) snapshot_key = root.keys[snapshot_keyid] - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... @@ -185,7 +185,7 @@ def test_sign_verify(self): with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(md_obj, JSONSerializer()) - sslib_signer = SSlibSigner(self.keystore["snapshot"]) + sslib_signer = SSlibSigner(self.keystore[Snapshot.type]) # Append a new signature with the unrelated key and assert that ... sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -196,7 +196,7 @@ def test_sign_verify(self): # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore["timestamp"]) + sslib_signer = SSlibSigner(self.keystore[Timestamp.type]) # Create and assign (don't append) a new signature and assert that ... md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -211,7 +211,7 @@ def test_verify_failures(self): root = Metadata[Root].from_file(root_path).signed # Locate the timestamp public key we need from root - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (timestamp) @@ -362,20 +362,20 @@ def test_metadata_verify_delegate(self): role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate("root", root) - root.verify_delegate("snapshot", snapshot) - root.verify_delegate("targets", targets) + root.verify_delegate(Root.type, root) + root.verify_delegate(Snapshot.type, snapshot) + root.verify_delegate(Targets.type, targets) targets.verify_delegate("role1", role1) role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate("snapshot", snapshot) + snapshot.verify_delegate(Snapshot.type, snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate("targets", targets) + targets.verify_delegate(Targets.type, targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): role2.verify_delegate("role1", role1) @@ -384,31 +384,31 @@ def test_metadata_verify_delegate(self): expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("timestamp", snapshot) + root.verify_delegate(Timestamp.type, snapshot) # Add a key to snapshot role, make sure the new sig fails to verify - ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) - root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) + ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids)) + root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid]) snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles["snapshot"].threshold = 2 + root.signed.roles[Snapshot.type].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) - root.verify_delegate("snapshot", snapshot) + snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True) + root.verify_delegate(Snapshot.type, snapshot) def test_key_class(self): # Test if from_securesystemslib_key removes the private key from keyval @@ -434,14 +434,14 @@ def test_root_add_key_and_remove_key(self): ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key("root", key_metadata) + root.signed.add_key(Root.type, key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles["root"].keyids) + self.assertIn(keyid, root.signed.roles[Root.type].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -449,29 +449,29 @@ def test_root_add_key_and_remove_key(self): root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles["root"].keyids.copy() - root.signed.add_key("root", key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) + pre_add_keyid = root.signed.roles[Root.type].keyids.copy() + root.signed.add_key(Root.type, key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids) # Add the same key to targets role as well - root.signed.add_key("targets", key_metadata) + root.signed.add_key(Targets.type, key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key("root", keyid) - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + root.signed.remove_key(Root.type, keyid) + self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key("targets", keyid) - self.assertNotIn(keyid, root.signed.roles["targets"].keyids) + root.signed.remove_key(Targets.type, keyid) + self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key("root", "nosuchkey") + root.signed.remove_key(Root.type, "nosuchkey") with self.assertRaises(ValueError): root.signed.remove_key("nosuchrole", keyid) @@ -662,7 +662,7 @@ def test_length_and_hash_validation(self): targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) file1_targetfile = targets.signed.targets["file1.txt"] - filepath = os.path.join(self.repo_dir, "targets", "file1.txt") + filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -680,7 +680,7 @@ def test_length_and_hash_validation(self): def test_targetfile_from_file(self): # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") targetfile_from_file = TargetFile.from_file( file_path, file_path, ["sha256"] ) @@ -689,20 +689,20 @@ def test_targetfile_from_file(self): targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, "targets", "file123.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt") with self.assertRaises(FileNotFoundError): TargetFile.from_file( file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM] ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") with self.assertRaises(exceptions.UnsupportedAlgorithmError): TargetFile.from_file(file_path, file_path, ["123"]) def test_targetfile_from_data(self): data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") # Test with a valid hash algorithm targetfile_from_data = TargetFile.from_data( diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index bdc2c78cc3..d94d3bba26 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -54,10 +54,10 @@ def setUpClass(cls): ) cls.metadata = {} for md in [ - "root", - "timestamp", - "snapshot", - "targets", + Root.type, + Timestamp.type, + Snapshot.type, + Targets.type, "role1", "role2", ]: @@ -67,10 +67,10 @@ def setUpClass(cls): keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + "_key"), password="password" + os.path.join(keystore_dir, Root.type + "_key"), password="password" ) - cls.keystore["root"] = SSlibSigner(root_key_dict) - for role in ["delegation", "snapshot", "targets", "timestamp"]: + cls.keystore[Root.type] = SSlibSigner(root_key_dict) + for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: key_dict = import_ed25519_privatekey_from_file( os.path.join(keystore_dir, role + "_key"), password="password" ) @@ -80,12 +80,12 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.hashes = None timestamp.snapshot_meta.length = None - cls.metadata["timestamp"] = cls.modify_metadata( - cls, "timestamp", hashes_length_modifier + cls.metadata[Timestamp.type] = cls.modify_metadata( + cls, Timestamp.type, hashes_length_modifier ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata["root"]) + self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) def _update_all_besides_targets( self, @@ -97,24 +97,24 @@ def _update_all_besides_targets( Args: timestamp_bytes: Bytes used when calling trusted_set.update_timestamp(). - Default self.metadata["timestamp"]. + Default self.metadata[Timestamp.type]. snapshot_bytes: Bytes used when calling trusted_set.update_snapshot(). - Default self.metadata["snapshot"]. + Default self.metadata[Snapshot.type]. """ - timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] + timestamp_bytes = timestamp_bytes or self.metadata[Timestamp.type] self.trusted_set.update_timestamp(timestamp_bytes) - snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] + snapshot_bytes = snapshot_bytes or self.metadata[Snapshot.type] self.trusted_set.update_snapshot(snapshot_bytes) def test_update(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_targets(self.metadata[Targets.type]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" @@ -132,38 +132,38 @@ def test_update(self): def test_out_of_order_ops(self): # Update snapshot before timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) # Update root after timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Root.type]) # Update targets before snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) # update timestamp after snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) # Update delegated targets before targets with self.assertRaises(RuntimeError): self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) # Update snapshot after sucessful targets update with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) def test_root_with_invalid_json(self): @@ -174,20 +174,20 @@ def test_root_with_invalid_json(self): test_func(b"") # root is invalid - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Root.type]) root.signed.version += 1 with self.assertRaises(exceptions.UnsignedMetadataError): test_func(root.to_bytes()) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata["snapshot"]) + test_func(self.metadata[Snapshot.type]) def test_top_level_md_with_invalid_json(self): top_level_md = [ - (self.metadata["timestamp"], self.trusted_set.update_timestamp), - (self.metadata["snapshot"], self.trusted_set.update_snapshot), - (self.metadata["targets"], self.trusted_set.update_targets), + (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), + (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), + (self.metadata[Targets.type], self.trusted_set.update_targets), ] for metadata, update_func in top_level_md: md = Metadata.from_bytes(metadata) @@ -202,7 +202,7 @@ def test_top_level_md_with_invalid_json(self): # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(self.metadata["root"]) + update_func(self.metadata[Root.type]) update_func(metadata) @@ -211,53 +211,53 @@ def test_update_root_new_root(self): def root_new_version_modifier(root: Root) -> None: root.version += 1 - root = self.modify_metadata("root", root_new_version_modifier) + root = self.modify_metadata(Root.type, root_new_version_modifier) self.trusted_set.update_root(root) def test_update_root_new_root_cannot_be_verified_with_threshold(self): # new_root data with threshold which cannot be verified. - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Root.type]) # remove root role keyids representing root signatures - root.signed.roles["root"].keyids = [] + root.signed.roles[Root.type].keyids = [] with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_root(root.to_bytes()) def test_update_root_new_root_ver_same_as_trusted_root_ver(self): with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Root.type]) def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) # intermediate root can be expired - root = self.modify_metadata("root", root_expired_modifier) + root = self.modify_metadata(Root.type, root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) + tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - timestamp = self.modify_metadata("timestamp", version_modifier) + timestamp = self.modify_metadata(Timestamp.type, version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_snapshot_ver_below_current(self): def bump_snapshot_version(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 # set current known snapshot.json version to 2 - timestamp = self.modify_metadata("timestamp", bump_snapshot_version) + timestamp = self.modify_metadata(Timestamp.type, bump_snapshot_version) self.trusted_set.update_timestamp(timestamp) # newtimestamp.meta.version < trusted_timestamp.meta.version with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_expired(self): # new_timestamp has expired @@ -266,29 +266,29 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: # expired intermediate timestamp is loaded but raises timestamp = self.modify_metadata( - "timestamp", timestamp_expired_modifier + Timestamp.type, timestamp_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) # snapshot update does start but fails because timestamp is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_length_or_hash_mismatch(self): def modify_snapshot_length(timestamp: Timestamp) -> None: timestamp.snapshot_meta.length = 1 # set known snapshot.json length to 1 - timestamp = self.modify_metadata("timestamp", modify_snapshot_length) + timestamp = self.modify_metadata(Timestamp.type, modify_snapshot_length) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) @@ -298,55 +298,55 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 timestamp = self.modify_metadata( - "timestamp", timestamp_version_modifier + Timestamp.type, timestamp_version_modifier ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) # targets update starts but fails if snapshot version does not match with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_snapshot_file_removed_from_meta(self): - self._update_all_besides_targets(self.metadata["timestamp"]) + self._update_all_besides_targets(self.metadata[Timestamp.type]) def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] # Test removing a meta_file in new_snapshot compared to the old snapshot - snapshot = self.modify_metadata("snapshot", remove_file_from_meta) + snapshot = self.modify_metadata(Snapshot.type, remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_meta_version_decreases(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def version_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta["targets.json"].version += 1 - snapshot = self.modify_metadata("snapshot", version_meta_modifier) + snapshot = self.modify_metadata(Snapshot.type, version_meta_modifier) self.trusted_set.update_snapshot(snapshot) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_expired_new_snapshot(self): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) # expired intermediate snapshot is loaded but will raise - snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) + snapshot = self.modify_metadata(Snapshot.type, snapshot_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(snapshot) # targets update does start but fails because snapshot is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_snapshot_successful_rollback_checks(self): def meta_version_bump(timestamp: Timestamp) -> None: @@ -356,51 +356,51 @@ def version_bump(snapshot: Snapshot) -> None: snapshot.version += 1 # load a "local" timestamp, then update to newer one: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - new_timestamp = self.modify_metadata("timestamp", meta_version_bump) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + new_timestamp = self.modify_metadata(Timestamp.type, meta_version_bump) self.trusted_set.update_timestamp(new_timestamp) # load a "local" snapshot with mismatching version (loading happens but # BadVersionNumberError is raised), then update to newer one: with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - new_snapshot = self.modify_metadata("snapshot", version_bump) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + new_snapshot = self.modify_metadata(Snapshot.type, version_bump) self.trusted_set.update_snapshot(new_snapshot) # update targets to trigger final snapshot meta version check - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_no_meta_in_snapshot(self): def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} - snapshot = self.modify_metadata("snapshot", no_meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, no_meta_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) - snapshot = self.modify_metadata("snapshot", meta_length_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, meta_length_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_version_different_snapshot_meta_version(self): def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) - snapshot = self.modify_metadata("snapshot", meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, meta_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_expired_new_target(self): self._update_all_besides_targets() @@ -408,7 +408,7 @@ def test_update_targets_expired_new_target(self): def target_expired_modifier(target: Targets) -> None: target.expires = datetime(1970, 1, 1) - targets = self.modify_metadata("targets", target_expired_modifier) + targets = self.modify_metadata(Targets.type, target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(targets) diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index 2a07fc6761..94517dad65 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -17,7 +17,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset -from tuf.api.metadata import Key +from tuf.api.metadata import Key, Root from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -176,14 +176,14 @@ def test_root_rotation(self, root_versions: List[RootVersion]) -> None: # Publish all remote root versions defined in root_versions for rootver in root_versions: # clear root keys, signers - self.sim.root.roles["root"].keyids.clear() - self.sim.signers["root"].clear() + self.sim.root.roles[Root.type].keyids.clear() + self.sim.signers[Root.type].clear() - self.sim.root.roles["root"].threshold = rootver.threshold + self.sim.root.roles[Root.type].threshold = rootver.threshold for i in rootver.keys: - self.sim.root.add_key("root", self.keys[i]) + self.sim.root.add_key(Root.type, self.keys[i]) for i in rootver.sigs: - self.sim.add_signer("root", self.signers[i]) + self.sim.add_signer(Root.type, self.signers[i]) self.sim.root.version += 1 self.sim.publish_root() diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index e4dd7566dc..bdfe350db2 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -19,7 +19,7 @@ from tests import utils from tuf import exceptions, ngclient, unittest_toolbox -from tuf.api.metadata import Metadata, TargetFile +from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp logger = logging.getLogger(__name__) @@ -175,15 +175,15 @@ def test_refresh_and_download(self): # top-level metadata is in local directory already self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfos, assert that cache does not contain files info1 = self.updater.get_targetinfo("file1.txt") - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfo for 'file3.txt' listed in the delegated role1 info3 = self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] self._assert_files(expected_files) self.assertIsNone(self.updater.find_cached_target(info1)) self.assertIsNone(self.updater.find_cached_target(info3)) @@ -207,14 +207,14 @@ def test_refresh_with_only_local_root(self): os.remove(os.path.join(self.client_directory, "role1.json")) os.remove(os.path.join(self.client_directory, "role2.json")) os.remove(os.path.join(self.client_directory, "1.root.json")) - self._assert_files(["root"]) + self._assert_files([Root.type]) self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfo for 'file3.txt' listed in the delegated role1 self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] self._assert_files(expected_files) def test_implicit_refresh_with_only_local_root(self): diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index c2647e4978..cdeadedce4 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -14,7 +14,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata +from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata, Root, Snapshot, Targets, Timestamp from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -35,7 +35,7 @@ class TestRefresh(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") - self.targets_dir = os.path.join(self.temp_dir.name, "targets") + self.targets_dir = os.path.join(self.temp_dir.name, Targets.type) os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) @@ -94,7 +94,7 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None: def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -106,7 +106,7 @@ def test_first_time_refresh(self) -> None: self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) for role in TOP_LEVEL_ROLE_NAMES: - version = 2 if role == "root" else None + version = 2 if role == Root.type else None self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: @@ -129,8 +129,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -143,7 +143,7 @@ def test_trusted_root_expired(self) -> None: # Root is successfully updated to latest version self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Root.type, 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -156,7 +156,7 @@ def test_trusted_root_unsigned(self) -> None: self._run_refresh() # The update failed, no changes in metadata - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) md_root_after = Metadata.from_file(root_path) self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) @@ -181,7 +181,7 @@ def test_max_root_rotations(self) -> None: # Assert that root version was increased with no more # than 'max_root_rotations' self._assert_version_equals( - "root", initial_root_version + updater.config.max_root_rotations + Root.type, initial_root_version + updater.config.max_root_rotations ) def test_intermediate_root_incorrectly_signed(self) -> None: @@ -189,13 +189,13 @@ def test_intermediate_root_incorrectly_signed(self) -> None: # Intermediate root v2 is unsigned self.sim.root.version += 1 - root_signers = self.sim.signers["root"].copy() - self.sim.signers["root"].clear() + root_signers = self.sim.signers[Root.type].copy() + self.sim.signers[Root.type].clear() self.sim.publish_root() # Final root v3 is correctly signed self.sim.root.version += 1 - self.sim.signers["root"] = root_signers + self.sim.signers[Root.type] = root_signers self.sim.publish_root() # Incorrectly signed intermediate root is detected @@ -203,8 +203,8 @@ def test_intermediate_root_incorrectly_signed(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file @@ -224,20 +224,20 @@ def test_intermediate_root_expired(self) -> None: # Successfully updated to root v3 self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Root.type, 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack self.sim.root.version += 1 # root v2 - self.sim.signers["root"].clear() + self.sim.signers[Root.type].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_new_root_same_version(self) -> None: # Check for a rollback_attack @@ -247,8 +247,8 @@ def test_new_root_same_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version @@ -258,8 +258,8 @@ def test_new_root_nonconsecutive_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_final_root_expired(self) -> None: # Check for a freeze attack @@ -272,16 +272,16 @@ def test_final_root_expired(self) -> None: self._run_refresh() # The update failed but final root is persisted on the file system - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 2) def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["timestamp"].clear() + self.sim.signers[Timestamp.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack @@ -292,7 +292,7 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Timestamp.type, 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. @@ -307,7 +307,7 @@ def test_new_timestamp_snapshot_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Timestamp.type, 2) def test_new_timestamp_expired(self) -> None: # Check for a freeze attack @@ -317,7 +317,7 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -338,16 +338,16 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("timestamp", 3) - self._assert_version_equals("snapshot", 1) + self._assert_version_equals(Timestamp.type, 3) + self._assert_version_equals(Snapshot.type, 1) def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["snapshot"].clear() + self.sim.signers[Snapshot.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_snapshot_version_mismatch(self): # Check against timestamp role’s snapshot version @@ -357,7 +357,7 @@ def test_new_snapshot_version_mismatch(self): with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_snapshot_version_rollback(self) -> None: # Check for a rollback attack @@ -371,7 +371,7 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("snapshot", 2) + self._assert_version_equals(Snapshot.type, 2) def test_new_snapshot_expired(self) -> None: # Check for a freeze attack @@ -381,7 +381,7 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -403,16 +403,16 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("snapshot", 3) - self._assert_version_equals("targets", 1) + self._assert_version_equals(Snapshot.type, 3) + self._assert_version_equals(Targets.type, 1) def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["targets"].clear() + self.sim.signers[Targets.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) def test_new_targets_version_mismatch(self): # Check against snapshot role’s targets version @@ -422,7 +422,7 @@ def test_new_targets_version_mismatch(self): with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) def test_new_targets_expired(self) -> None: # Check for a freeze attack. @@ -432,7 +432,7 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) if __name__ == "__main__":