diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index a763fc053d..bb86b59cb7 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -139,7 +139,7 @@ def targets(self) -> Targets: def all_targets(self) -> Iterator[Tuple[str, Targets]]: """Yield role name and signed portion of targets one by one.""" - yield "targets", self.md_targets.signed + yield Targets.type, self.md_targets.signed for role, md in self.md_delegates.items(): yield role, md.signed @@ -181,7 +181,7 @@ def _initialize(self) -> None: def publish_root(self) -> None: """Sign and store a new serialized version of root.""" self.md_root.signatures.clear() - for signer in self.signers["root"].values(): + for signer in self.signers[Root.type].values(): self.md_root.sign(signer, append=True) self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) @@ -197,8 +197,8 @@ def fetch(self, url: str) -> Iterator[bytes]: ver_and_name = path[len("/metadata/") :][: -len(".json")] version_str, _, role = ver_and_name.partition(".") # root is always version-prefixed while timestamp is always NOT - if role == "root" or ( - self.root.consistent_snapshot and ver_and_name != "timestamp" + if role == Root.type or ( + self.root.consistent_snapshot and ver_and_name != Timestamp.type ): version: Optional[int] = int(version_str) else: @@ -248,7 +248,7 @@ def _fetch_metadata( If version is None, non-versioned metadata is being requested. """ - if role == "root": + if role == Root.type: # return a version previously serialized in publish_root() if version is None or version > len(self.signed_roots): raise FetcherHTTPError(f"Unknown root version {version}", 404) @@ -257,11 +257,11 @@ def _fetch_metadata( # sign and serialize the requested metadata md: Optional[Metadata] - if role == "timestamp": + if role == Timestamp.type: md = self.md_timestamp - elif role == "snapshot": + elif role == Snapshot.type: md = self.md_snapshot - elif role == "targets": + elif role == Targets.type: md = self.md_targets else: md = self.md_delegates.get(role) @@ -297,7 +297,7 @@ def update_timestamp(self) -> None: self.timestamp.snapshot_meta.version = self.snapshot.version if self.compute_metafile_hashes_length: - hashes, length = self._compute_hashes_and_length("snapshot") + hashes, length = self._compute_hashes_and_length(Snapshot.type) self.timestamp.snapshot_meta.hashes = hashes self.timestamp.snapshot_meta.length = length @@ -320,7 +320,7 @@ def update_snapshot(self) -> None: def add_target(self, role: str, data: bytes, path: str) -> None: """Create a target from data and add it to the target_files.""" - if role == "targets": + if role == Targets.type: targets = self.targets else: targets = self.md_delegates[role].signed @@ -339,7 +339,7 @@ def add_delegation( hash_prefixes: Optional[List[str]], ) -> None: """Add delegated target role to the repository.""" - if delegator_name == "targets": + if delegator_name == Targets.type: delegator = self.targets else: delegator = self.md_delegates[delegator_name].signed @@ -375,9 +375,9 @@ def write(self) -> None: for ver in range(1, len(self.signed_roots) + 1): with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f: - f.write(self._fetch_metadata("root", ver)) + f.write(self._fetch_metadata(Root.type, ver)) - for role in ["timestamp", "snapshot", "targets"]: + for role in [Timestamp.type, Snapshot.type, Targets.type]: with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) diff --git a/tests/test_api.py b/tests/test_api.py index 02c6521725..8bd69c9b32 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -78,7 +78,7 @@ def setUpClass(cls) -> None: # Load keys into memory cls.keystore = {} - for role in ["delegation", "snapshot", "targets", "timestamp"]: + for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: cls.keystore[role] = import_ed25519_privatekey_from_file( os.path.join(cls.keystore_dir, role + "_key"), password="password", @@ -92,10 +92,10 @@ def tearDownClass(cls) -> None: def test_generic_read(self) -> None: for metadata, inner_metadata_cls in [ - ("root", Root), - ("snapshot", Snapshot), - ("timestamp", Timestamp), - ("targets", Targets), + (Root.type, Root), + (Snapshot.type, Snapshot), + (Timestamp.type, Timestamp), + (Targets.type, Targets), ]: # Load JSON-formatted metdata of each supported type from file @@ -136,7 +136,7 @@ def test_compact_json(self) -> None: ) def test_read_write_read_compare(self) -> None: - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") md_obj = Metadata.from_file(path) @@ -148,7 +148,7 @@ def test_read_write_read_compare(self) -> None: os.remove(path_2) def test_to_from_bytes(self) -> None: - for metadata in ["root", "snapshot", "timestamp", "targets"]: + for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() @@ -169,11 +169,11 @@ def test_sign_verify(self) -> None: root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root - targets_keyid = next(iter(root.roles["targets"].keyids)) + targets_keyid = next(iter(root.roles[Targets.type].keyids)) targets_key = root.keys[targets_keyid] - snapshot_keyid = next(iter(root.roles["snapshot"].keyids)) + snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids)) snapshot_key = root.keys[snapshot_keyid] - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... @@ -192,7 +192,7 @@ def test_sign_verify(self) -> None: with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type] - sslib_signer = SSlibSigner(self.keystore["snapshot"]) + sslib_signer = SSlibSigner(self.keystore[Snapshot.type]) # Append a new signature with the unrelated key and assert that ... sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -203,7 +203,7 @@ def test_sign_verify(self) -> None: # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore["timestamp"]) + sslib_signer = SSlibSigner(self.keystore[Timestamp.type]) # Create and assign (don't append) a new signature and assert that ... md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -218,7 +218,7 @@ def test_verify_failures(self) -> None: root = Metadata[Root].from_file(root_path).signed # Locate the timestamp public key we need from root - timestamp_keyid = next(iter(root.roles["timestamp"].keyids)) + timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids)) timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (timestamp) @@ -369,20 +369,20 @@ def test_metadata_verify_delegate(self) -> None: role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate("root", root) - root.verify_delegate("snapshot", snapshot) - root.verify_delegate("targets", targets) + root.verify_delegate(Root.type, root) + root.verify_delegate(Snapshot.type, snapshot) + root.verify_delegate(Targets.type, targets) targets.verify_delegate("role1", role1) role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate("snapshot", snapshot) + snapshot.verify_delegate(Snapshot.type, snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate("targets", targets) + targets.verify_delegate(Targets.type, targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): role2.verify_delegate("role1", role1) @@ -391,31 +391,31 @@ def test_metadata_verify_delegate(self) -> None: expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("timestamp", snapshot) + root.verify_delegate(Timestamp.type, snapshot) # Add a key to snapshot role, make sure the new sig fails to verify - ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) - root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) + ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids)) + root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid]) snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles["snapshot"].threshold = 2 + root.signed.roles[Snapshot.type].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate("snapshot", snapshot) + root.verify_delegate(Snapshot.type, snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) - root.verify_delegate("snapshot", snapshot) + snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True) + root.verify_delegate(Snapshot.type, snapshot) def test_key_class(self) -> None: # Test if from_securesystemslib_key removes the private key from keyval @@ -441,14 +441,14 @@ def test_root_add_key_and_remove_key(self) -> None: ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key("root", key_metadata) + root.signed.add_key(Root.type, key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles["root"].keyids) + self.assertIn(keyid, root.signed.roles[Root.type].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -456,29 +456,29 @@ def test_root_add_key_and_remove_key(self) -> None: root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles["root"].keyids.copy() - root.signed.add_key("root", key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) + pre_add_keyid = root.signed.roles[Root.type].keyids.copy() + root.signed.add_key(Root.type, key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids) # Add the same key to targets role as well - root.signed.add_key("targets", key_metadata) + root.signed.add_key(Targets.type, key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key("root", keyid) - self.assertNotIn(keyid, root.signed.roles["root"].keyids) + root.signed.remove_key(Root.type, keyid) + self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key("targets", keyid) - self.assertNotIn(keyid, root.signed.roles["targets"].keyids) + root.signed.remove_key(Targets.type, keyid) + self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key("root", "nosuchkey") + root.signed.remove_key(Root.type, "nosuchkey") with self.assertRaises(ValueError): root.signed.remove_key("nosuchrole", keyid) @@ -670,7 +670,7 @@ def test_length_and_hash_validation(self) -> None: targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) file1_targetfile = targets.signed.targets["file1.txt"] - filepath = os.path.join(self.repo_dir, "targets", "file1.txt") + filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -688,7 +688,7 @@ def test_length_and_hash_validation(self) -> None: def test_targetfile_from_file(self) -> None: # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") targetfile_from_file = TargetFile.from_file( file_path, file_path, ["sha256"] ) @@ -697,20 +697,20 @@ def test_targetfile_from_file(self) -> None: targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, "targets", "file123.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt") with self.assertRaises(FileNotFoundError): TargetFile.from_file( file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM] ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") with self.assertRaises(exceptions.UnsupportedAlgorithmError): TargetFile.from_file(file_path, file_path, ["123"]) def test_targetfile_from_data(self) -> None: data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") # Test with a valid hash algorithm targetfile_from_data = TargetFile.from_data( diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 9dfacf1a1d..6426370e74 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -15,6 +15,7 @@ from tests import utils from tuf import exceptions from tuf.api.metadata import ( + T, Metadata, MetaFile, Root, @@ -58,10 +59,10 @@ def setUpClass(cls) -> None: ) cls.metadata = {} for md in [ - "root", - "timestamp", - "snapshot", - "targets", + Root.type, + Timestamp.type, + Snapshot.type, + Targets.type, "role1", "role2", ]: @@ -71,10 +72,10 @@ def setUpClass(cls) -> None: keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + "_key"), password="password" + os.path.join(keystore_dir, Root.type + "_key"), password="password" ) - cls.keystore["root"] = SSlibSigner(root_key_dict) - for role in ["delegation", "snapshot", "targets", "timestamp"]: + cls.keystore[Root.type] = SSlibSigner(root_key_dict) + for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]: key_dict = import_ed25519_privatekey_from_file( os.path.join(keystore_dir, role + "_key"), password="password" ) @@ -84,12 +85,12 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.hashes = None timestamp.snapshot_meta.length = None - cls.metadata["timestamp"] = cls.modify_metadata( - "timestamp", hashes_length_modifier + cls.metadata[Timestamp.type] = cls.modify_metadata( + Timestamp.type, hashes_length_modifier ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata["root"]) + self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) def _update_all_besides_targets( self, @@ -101,24 +102,24 @@ def _update_all_besides_targets( Args: timestamp_bytes: Bytes used when calling trusted_set.update_timestamp(). - Default self.metadata["timestamp"]. + Default self.metadata[Timestamp.type]. snapshot_bytes: Bytes used when calling trusted_set.update_snapshot(). - Default self.metadata["snapshot"]. + Default self.metadata[Snapshot.type]. """ - timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] + timestamp_bytes = timestamp_bytes or self.metadata[Timestamp.type] self.trusted_set.update_timestamp(timestamp_bytes) - snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] + snapshot_bytes = snapshot_bytes or self.metadata[Snapshot.type] self.trusted_set.update_snapshot(snapshot_bytes) def test_update(self) -> None: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_targets(self.metadata[Targets.type]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" @@ -154,38 +155,38 @@ def test_update_metadata_output(self) -> None: def test_out_of_order_ops(self) -> None: # Update snapshot before timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) # Update root after timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Root.type]) # Update targets before snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) # update timestamp after snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) # Update delegated targets before targets with self.assertRaises(RuntimeError): self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) # Update snapshot after sucessful targets update with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", Targets.type ) def test_root_with_invalid_json(self) -> None: @@ -196,20 +197,20 @@ def test_root_with_invalid_json(self) -> None: test_func(b"") # root is invalid - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Root.type]) root.signed.version += 1 with self.assertRaises(exceptions.UnsignedMetadataError): test_func(root.to_bytes()) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata["snapshot"]) + test_func(self.metadata[Snapshot.type]) def test_top_level_md_with_invalid_json(self) -> None: top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [ - (self.metadata["timestamp"], self.trusted_set.update_timestamp), - (self.metadata["snapshot"], self.trusted_set.update_snapshot), - (self.metadata["targets"], self.trusted_set.update_targets), + (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), + (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), + (self.metadata[Targets.type], self.trusted_set.update_targets), ] for metadata, update_func in top_level_md: md = Metadata.from_bytes(metadata) @@ -224,7 +225,7 @@ def test_top_level_md_with_invalid_json(self) -> None: # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(self.metadata["root"]) + update_func(self.metadata[Root.type]) update_func(metadata) @@ -233,53 +234,53 @@ def test_update_root_new_root(self) -> None: def root_new_version_modifier(root: Root) -> None: root.version += 1 - root = self.modify_metadata("root", root_new_version_modifier) + root = self.modify_metadata(Root.type, root_new_version_modifier) self.trusted_set.update_root(root) def test_update_root_new_root_fail_threshold_verification(self) -> None: # new_root data with threshold which cannot be verified. - root = Metadata.from_bytes(self.metadata["root"]) + root = Metadata.from_bytes(self.metadata[Root.type]) # remove root role keyids representing root signatures - root.signed.roles["root"].keyids = set() + root.signed.roles[Root.type].keyids = set() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_root(root.to_bytes()) def test_update_root_new_root_ver_same_as_trusted_root_ver(self) -> None: with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_root(self.metadata["root"]) + self.trusted_set.update_root(self.metadata[Root.type]) def test_root_expired_final_root(self) -> None: def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) # intermediate root can be expired - root = self.modify_metadata("root", root_expired_modifier) + root = self.modify_metadata(Root.type, root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) + tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self) -> None: # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - timestamp = self.modify_metadata("timestamp", version_modifier) + timestamp = self.modify_metadata(Timestamp.type, version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_snapshot_ver_below_current(self) -> None: def bump_snapshot_version(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 # set current known snapshot.json version to 2 - timestamp = self.modify_metadata("timestamp", bump_snapshot_version) + timestamp = self.modify_metadata(Timestamp.type, bump_snapshot_version) self.trusted_set.update_timestamp(timestamp) # newtimestamp.meta.version < trusted_timestamp.meta.version with self.assertRaises(exceptions.ReplayedMetadataError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_expired(self) -> None: # new_timestamp has expired @@ -288,29 +289,29 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: # expired intermediate timestamp is loaded but raises timestamp = self.modify_metadata( - "timestamp", timestamp_expired_modifier + Timestamp.type, timestamp_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) # snapshot update does start but fails because timestamp is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_length_or_hash_mismatch(self) -> None: def modify_snapshot_length(timestamp: Timestamp) -> None: timestamp.snapshot_meta.length = 1 # set known snapshot.json length to 1 - timestamp = self.modify_metadata("timestamp", modify_snapshot_length) + timestamp = self.modify_metadata(Timestamp.type, modify_snapshot_length) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_fail_threshold_verification(self) -> None: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) @@ -322,55 +323,55 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 timestamp = self.modify_metadata( - "timestamp", timestamp_version_modifier + Timestamp.type, timestamp_version_modifier ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) # targets update starts but fails if snapshot version does not match with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_snapshot_file_removed_from_meta(self) -> None: - self._update_all_besides_targets(self.metadata["timestamp"]) + self._update_all_besides_targets(self.metadata[Timestamp.type]) def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] # Test removing a meta_file in new_snapshot compared to the old snapshot - snapshot = self.modify_metadata("snapshot", remove_file_from_meta) + snapshot = self.modify_metadata(Snapshot.type, remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_meta_version_decreases(self) -> None: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def version_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta["targets.json"].version += 1 - snapshot = self.modify_metadata("snapshot", version_meta_modifier) + snapshot = self.modify_metadata(Snapshot.type, version_meta_modifier) self.trusted_set.update_snapshot(snapshot) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_expired_new_snapshot(self) -> None: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) # expired intermediate snapshot is loaded but will raise - snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) + snapshot = self.modify_metadata(Snapshot.type, snapshot_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(snapshot) # targets update does start but fails because snapshot is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_snapshot_successful_rollback_checks(self) -> None: def meta_version_bump(timestamp: Timestamp) -> None: @@ -380,51 +381,51 @@ def version_bump(snapshot: Snapshot) -> None: snapshot.version += 1 # load a "local" timestamp, then update to newer one: - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - new_timestamp = self.modify_metadata("timestamp", meta_version_bump) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + new_timestamp = self.modify_metadata(Timestamp.type, meta_version_bump) self.trusted_set.update_timestamp(new_timestamp) # load a "local" snapshot with mismatching version (loading happens but # BadVersionNumberError is raised), then update to newer one: with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) - new_snapshot = self.modify_metadata("snapshot", version_bump) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + new_snapshot = self.modify_metadata(Snapshot.type, version_bump) self.trusted_set.update_snapshot(new_snapshot) # update targets to trigger final snapshot meta version check - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_no_meta_in_snapshot(self) -> None: def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} - snapshot = self.modify_metadata("snapshot", no_meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, no_meta_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_hash_diverge_from_snapshot_meta_hash(self) -> None: def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) - snapshot = self.modify_metadata("snapshot", meta_length_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, meta_length_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_version_diverge_snapshot_meta_version(self) -> None: def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) - snapshot = self.modify_metadata("snapshot", meta_modifier) - self._update_all_besides_targets(self.metadata["timestamp"], snapshot) + snapshot = self.modify_metadata(Snapshot.type, meta_modifier) + self._update_all_besides_targets(self.metadata[Timestamp.type], snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_expired_new_target(self) -> None: self._update_all_besides_targets() @@ -432,7 +433,7 @@ def test_update_targets_expired_new_target(self) -> None: def target_expired_modifier(target: Targets) -> None: target.expires = datetime(1970, 1, 1) - targets = self.modify_metadata("targets", target_expired_modifier) + targets = self.modify_metadata(Targets.type, target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(targets) diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index 9855c7f492..2aa93b33a7 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -17,7 +17,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset -from tuf.api.metadata import Key +from tuf.api.metadata import Key, Root from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -176,14 +176,14 @@ def test_root_rotation(self, root_versions: List[RootVersion]) -> None: # Publish all remote root versions defined in root_versions for rootver in root_versions: # clear root keys, signers - self.sim.root.roles["root"].keyids.clear() - self.sim.signers["root"].clear() + self.sim.root.roles[Root.type].keyids.clear() + self.sim.signers[Root.type].clear() - self.sim.root.roles["root"].threshold = rootver.threshold + self.sim.root.roles[Root.type].threshold = rootver.threshold for i in rootver.keys: - self.sim.root.add_key("root", self.keys[i]) + self.sim.root.add_key(Root.type, self.keys[i]) for i in rootver.sigs: - self.sim.add_signer("root", self.signers[i]) + self.sim.add_signer(Root.type, self.signers[i]) self.sim.root.version += 1 self.sim.publish_root() diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 3616e649ec..5cf9f4200f 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -19,7 +19,7 @@ from tests import utils from tuf import exceptions, ngclient, unittest_toolbox -from tuf.api.metadata import Metadata, Root, TargetFile +from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp logger = logging.getLogger(__name__) @@ -180,17 +180,17 @@ def test_refresh_and_download(self) -> None: # top-level metadata is in local directory already self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfos, assert that cache does not contain files info1 = self.updater.get_targetinfo("file1.txt") assert isinstance(info1, TargetFile) - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfo for 'file3.txt' listed in the delegated role1 info3 = self.updater.get_targetinfo("file3.txt") assert isinstance(info3, TargetFile) - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] self._assert_files(expected_files) self.assertIsNone(self.updater.find_cached_target(info1)) self.assertIsNone(self.updater.find_cached_target(info3)) @@ -214,14 +214,14 @@ def test_refresh_with_only_local_root(self) -> None: os.remove(os.path.join(self.client_directory, "role1.json")) os.remove(os.path.join(self.client_directory, "role2.json")) os.remove(os.path.join(self.client_directory, "1.root.json")) - self._assert_files(["root"]) + self._assert_files([Root.type]) self.updater.refresh() - self._assert_files(["root", "snapshot", "targets", "timestamp"]) + self._assert_files([Root.type, Snapshot.type, Targets.type, Timestamp.type]) # Get targetinfo for 'file3.txt' listed in the delegated role1 self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = ["role1", Root.type, Snapshot.type, Targets.type, Timestamp.type] self._assert_files(expected_files) def test_implicit_refresh_with_only_local_root(self) -> None: diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index a5b511a60a..24a906bef6 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -14,7 +14,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata +from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata, Root, Snapshot, Targets, Timestamp from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -35,7 +35,7 @@ class TestRefresh(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") - self.targets_dir = os.path.join(self.temp_dir.name, "targets") + self.targets_dir = os.path.join(self.temp_dir.name, Targets.type) os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) @@ -94,7 +94,7 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None: def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -106,7 +106,7 @@ def test_first_time_refresh(self) -> None: self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) for role in TOP_LEVEL_ROLE_NAMES: - version = 2 if role == "root" else None + version = 2 if role == Root.type else None self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: @@ -129,8 +129,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -143,7 +143,7 @@ def test_trusted_root_expired(self) -> None: # Root is successfully updated to latest version self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Root.type, 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -156,7 +156,7 @@ def test_trusted_root_unsigned(self) -> None: self._run_refresh() # The update failed, no changes in metadata - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) md_root_after = Metadata.from_file(root_path) self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) @@ -181,7 +181,7 @@ def test_max_root_rotations(self) -> None: # Assert that root version was increased with no more # than 'max_root_rotations' self._assert_version_equals( - "root", initial_root_version + updater.config.max_root_rotations + Root.type, initial_root_version + updater.config.max_root_rotations ) def test_intermediate_root_incorrectly_signed(self) -> None: @@ -189,13 +189,13 @@ def test_intermediate_root_incorrectly_signed(self) -> None: # Intermediate root v2 is unsigned self.sim.root.version += 1 - root_signers = self.sim.signers["root"].copy() - self.sim.signers["root"].clear() + root_signers = self.sim.signers[Root.type].copy() + self.sim.signers[Root.type].clear() self.sim.publish_root() # Final root v3 is correctly signed self.sim.root.version += 1 - self.sim.signers["root"] = root_signers + self.sim.signers[Root.type] = root_signers self.sim.publish_root() # Incorrectly signed intermediate root is detected @@ -203,8 +203,8 @@ def test_intermediate_root_incorrectly_signed(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file @@ -224,20 +224,20 @@ def test_intermediate_root_expired(self) -> None: # Successfully updated to root v3 self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - self._assert_content_equals("root", 3) + self._assert_content_equals(Root.type, 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack self.sim.root.version += 1 # root v2 - self.sim.signers["root"].clear() + self.sim.signers[Root.type].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_new_root_same_version(self) -> None: # Check for a rollback_attack @@ -247,8 +247,8 @@ def test_new_root_same_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version @@ -258,8 +258,8 @@ def test_new_root_nonconsecutive_version(self) -> None: self._run_refresh() # The update failed, latest root version is v1 - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 1) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 1) def test_final_root_expired(self) -> None: # Check for a freeze attack @@ -272,16 +272,16 @@ def test_final_root_expired(self) -> None: self._run_refresh() # The update failed but final root is persisted on the file system - self._assert_files_exist(["root"]) - self._assert_content_equals("root", 2) + self._assert_files_exist([Root.type]) + self._assert_content_equals(Root.type, 2) def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["timestamp"].clear() + self.sim.signers[Timestamp.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack @@ -292,7 +292,7 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Timestamp.type, 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. @@ -307,7 +307,7 @@ def test_new_timestamp_snapshot_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("timestamp", 2) + self._assert_version_equals(Timestamp.type, 2) def test_new_timestamp_expired(self) -> None: # Check for a freeze attack @@ -317,7 +317,7 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root"]) + self._assert_files_exist([Root.type]) def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -338,16 +338,16 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("timestamp", 3) - self._assert_version_equals("snapshot", 1) + self._assert_version_equals(Timestamp.type, 3) + self._assert_version_equals(Snapshot.type, 1) def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["snapshot"].clear() + self.sim.signers[Snapshot.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_snapshot_version_mismatch(self) -> None: # Check against timestamp role’s snapshot version @@ -357,7 +357,7 @@ def test_new_snapshot_version_mismatch(self) -> None: with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_snapshot_version_rollback(self) -> None: # Check for a rollback attack @@ -371,7 +371,7 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - self._assert_version_equals("snapshot", 2) + self._assert_version_equals(Snapshot.type, 2) def test_new_snapshot_expired(self) -> None: # Check for a freeze attack @@ -381,7 +381,7 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp"]) + self._assert_files_exist([Root.type, Timestamp.type]) def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -403,16 +403,16 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - self._assert_version_equals("snapshot", 3) - self._assert_version_equals("targets", 1) + self._assert_version_equals(Snapshot.type, 3) + self._assert_version_equals(Targets.type, 1) def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack - self.sim.signers["targets"].clear() + self.sim.signers[Targets.type].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) def test_new_targets_version_mismatch(self) -> None: # Check against snapshot role’s targets version @@ -422,7 +422,7 @@ def test_new_targets_version_mismatch(self) -> None: with self.assertRaises(BadVersionNumberError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) def test_new_targets_expired(self) -> None: # Check for a freeze attack. @@ -432,7 +432,7 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() - self._assert_files_exist(["root", "timestamp", "snapshot"]) + self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type]) if __name__ == "__main__": diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 23675045fe..955d930df6 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -68,7 +68,14 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp +from tuf.api.metadata import ( + Metadata, + Root, + Snapshot, + TargetFile, + Targets, + Timestamp, +) from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface