Skip to content

Commit

Permalink
Apply top-level rolenames constants in tests
Browse files Browse the repository at this point in the history
This applies the use of constants of top-level rolenames in the
tests instead of the previously hardcoded strings.
Fixes #1648

Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
  • Loading branch information
Ivana Atanasova committed Dec 2, 2021
1 parent d7c6534 commit 00589f0
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 192 deletions.
26 changes: 13 additions & 13 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def targets(self) -> Targets:

def all_targets(self) -> Iterator[Tuple[str, Targets]]:
"""Yield role name and signed portion of targets one by one."""
yield "targets", self.md_targets.signed
yield Targets.type, self.md_targets.signed
for role, md in self.md_delegates.items():
yield role, md.signed

Expand Down Expand Up @@ -181,7 +181,7 @@ def _initialize(self) -> None:
def publish_root(self) -> None:
"""Sign and store a new serialized version of root."""
self.md_root.signatures.clear()
for signer in self.signers["root"].values():
for signer in self.signers[Root.type].values():
self.md_root.sign(signer, append=True)

self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
Expand All @@ -197,8 +197,8 @@ def fetch(self, url: str) -> Iterator[bytes]:
ver_and_name = path[len("/metadata/") :][: -len(".json")]
version_str, _, role = ver_and_name.partition(".")
# root is always version-prefixed while timestamp is always NOT
if role == "root" or (
self.root.consistent_snapshot and ver_and_name != "timestamp"
if role == Root.type or (
self.root.consistent_snapshot and ver_and_name != Timestamp.type
):
version: Optional[int] = int(version_str)
else:
Expand Down Expand Up @@ -248,7 +248,7 @@ def _fetch_metadata(
If version is None, non-versioned metadata is being requested.
"""
if role == "root":
if role == Root.type:
# return a version previously serialized in publish_root()
if version is None or version > len(self.signed_roots):
raise FetcherHTTPError(f"Unknown root version {version}", 404)
Expand All @@ -257,11 +257,11 @@ def _fetch_metadata(

# sign and serialize the requested metadata
md: Optional[Metadata]
if role == "timestamp":
if role == Timestamp.type:
md = self.md_timestamp
elif role == "snapshot":
elif role == Snapshot.type:
md = self.md_snapshot
elif role == "targets":
elif role == Targets.type:
md = self.md_targets
else:
md = self.md_delegates.get(role)
Expand Down Expand Up @@ -297,7 +297,7 @@ def update_timestamp(self) -> None:
self.timestamp.snapshot_meta.version = self.snapshot.version

if self.compute_metafile_hashes_length:
hashes, length = self._compute_hashes_and_length("snapshot")
hashes, length = self._compute_hashes_and_length(Snapshot.type)
self.timestamp.snapshot_meta.hashes = hashes
self.timestamp.snapshot_meta.length = length

Expand All @@ -320,7 +320,7 @@ def update_snapshot(self) -> None:

def add_target(self, role: str, data: bytes, path: str) -> None:
"""Create a target from data and add it to the target_files."""
if role == "targets":
if role == Targets.type:
targets = self.targets
else:
targets = self.md_delegates[role].signed
Expand All @@ -339,7 +339,7 @@ def add_delegation(
hash_prefixes: Optional[List[str]],
) -> None:
"""Add delegated target role to the repository."""
if delegator_name == "targets":
if delegator_name == Targets.type:
delegator = self.targets
else:
delegator = self.md_delegates[delegator_name].signed
Expand Down Expand Up @@ -375,9 +375,9 @@ def write(self) -> None:

for ver in range(1, len(self.signed_roots) + 1):
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
f.write(self._fetch_metadata("root", ver))
f.write(self._fetch_metadata(Root.type, ver))

for role in ["timestamp", "snapshot", "targets"]:
for role in [Timestamp.type, Snapshot.type, Targets.type]:
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
f.write(self._fetch_metadata(role))

Expand Down
88 changes: 44 additions & 44 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def setUpClass(cls) -> None:

# Load keys into memory
cls.keystore = {}
for role in ["delegation", "snapshot", "targets", "timestamp"]:
for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]:
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + "_key"),
password="password",
Expand All @@ -92,10 +92,10 @@ def tearDownClass(cls) -> None:

def test_generic_read(self) -> None:
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
("timestamp", Timestamp),
("targets", Targets),
(Root.type, Root),
(Snapshot.type, Snapshot),
(Timestamp.type, Timestamp),
(Targets.type, Targets),
]:

# Load JSON-formatted metdata of each supported type from file
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_compact_json(self) -> None:
)

def test_read_write_read_compare(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)

Expand All @@ -148,7 +148,7 @@ def test_read_write_read_compare(self) -> None:
os.remove(path_2)

def test_to_from_bytes(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
metadata_bytes = f.read()
Expand All @@ -169,11 +169,11 @@ def test_sign_verify(self) -> None:
root = Metadata[Root].from_file(root_path).signed

# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_keyid = next(iter(root.roles[Targets.type].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (targets) and assert ...
Expand All @@ -192,7 +192,7 @@ def test_sign_verify(self) -> None:
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type]

sslib_signer = SSlibSigner(self.keystore["snapshot"])
sslib_signer = SSlibSigner(self.keystore[Snapshot.type])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
Expand All @@ -203,7 +203,7 @@ def test_sign_verify(self) -> None:
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)

sslib_signer = SSlibSigner(self.keystore["timestamp"])
sslib_signer = SSlibSigner(self.keystore[Timestamp.type])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
Expand All @@ -218,7 +218,7 @@ def test_verify_failures(self) -> None:
root = Metadata[Root].from_file(root_path).signed

# Locate the timestamp public key we need from root
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (timestamp)
Expand Down Expand Up @@ -369,20 +369,20 @@ def test_metadata_verify_delegate(self) -> None:
role2 = Metadata[Targets].from_file(role2_path)

# test the expected delegation tree
root.verify_delegate("root", root)
root.verify_delegate("snapshot", snapshot)
root.verify_delegate("targets", targets)
root.verify_delegate(Root.type, root)
root.verify_delegate(Snapshot.type, snapshot)
root.verify_delegate(Targets.type, targets)
targets.verify_delegate("role1", role1)
role1.verify_delegate("role2", role2)

# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate("snapshot", snapshot)
snapshot.verify_delegate(Snapshot.type, snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate("role1", role1)
with self.assertRaises(ValueError):
targets.verify_delegate("targets", targets)
targets.verify_delegate(Targets.type, targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate("role1", role1)
Expand All @@ -391,31 +391,31 @@ def test_metadata_verify_delegate(self) -> None:
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)
snapshot.signed.expires = expires

# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("timestamp", snapshot)
root.verify_delegate(Timestamp.type, snapshot)

# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids))
root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)

# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)

# verify fails if threshold of signatures is not reached
root.signed.roles["snapshot"].threshold = 2
root.signed.roles[Snapshot.type].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)

# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)
snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True)
root.verify_delegate(Snapshot.type, snapshot)

def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
Expand All @@ -441,44 +441,44 @@ def test_root_add_key_and_remove_key(self) -> None:
)

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key("root", key_metadata)
root.signed.add_key(Root.type, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.roles[Root.type].keyids)
self.assertIn(keyid, root.signed.keys)

# Confirm that the newly added key does not break
# the object serialization
root.to_dict()

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles["root"].keyids.copy()
root.signed.add_key("root", key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
pre_add_keyid = root.signed.roles[Root.type].keyids.copy()
root.signed.add_key(Root.type, key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids)

# Add the same key to targets role as well
root.signed.add_key("targets", key_metadata)
root.signed.add_key(Targets.type, key_metadata)

# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)

# Remove the key from root role (targets role still uses it)
root.signed.remove_key("root", keyid)
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
root.signed.remove_key(Root.type, keyid)
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
self.assertIn(keyid, root.signed.keys)

# Remove the key from targets as well
root.signed.remove_key("targets", keyid)
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
root.signed.remove_key(Targets.type, keyid)
self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids)
self.assertNotIn(keyid, root.signed.keys)

with self.assertRaises(ValueError):
root.signed.remove_key("root", "nosuchkey")
root.signed.remove_key(Root.type, "nosuchkey")
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)

Expand Down Expand Up @@ -670,7 +670,7 @@ def test_length_and_hash_validation(self) -> None:
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
file1_targetfile = targets.signed.targets["file1.txt"]
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt")

with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)
Expand All @@ -688,7 +688,7 @@ def test_length_and_hash_validation(self) -> None:

def test_targetfile_from_file(self) -> None:
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ["sha256"]
)
Expand All @@ -697,20 +697,20 @@ def test_targetfile_from_file(self) -> None:
targetfile_from_file.verify_length_and_hashes(file)

# Test with a non-existing file
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
TargetFile.from_file(file_path, file_path, ["123"])

def test_targetfile_from_data(self) -> None:
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")

# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(
Expand Down
Loading

0 comments on commit 00589f0

Please sign in to comment.