Skip to content

Commit

Permalink
Apply top-level rolenames constants in tests
Browse files Browse the repository at this point in the history
This applies the use of constants of top-level rolenames in the
tests instead of the previously hardcoded strings.
Addresses #1648

Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
  • Loading branch information
Ivana Atanasova committed Nov 12, 2021
1 parent df35943 commit 3a7e025
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 206 deletions.
37 changes: 19 additions & 18 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

from tuf.api.metadata import (
SPECIFICATION_VERSION,
TOP_LEVEL_ROLE_NAMES,
RoleNames,
DelegatedRole,
Delegations,
Key,
Expand All @@ -73,6 +73,7 @@
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
from tuf.api.metadata import RoleNames
from tuf.exceptions import FetcherHTTPError
from tuf.ngclient.fetcher import FetcherInterface

Expand Down Expand Up @@ -140,7 +141,7 @@ def targets(self) -> Targets:

def all_targets(self) -> Iterator[Tuple[str, Targets]]:
"""Yield role name and signed portion of targets one by one."""
yield "targets", self.md_targets.signed
yield RoleNames.TARGETS.value, self.md_targets.signed
for role, md in self.md_delegates.items():
yield role, md.signed

Expand Down Expand Up @@ -168,21 +169,21 @@ def _initialize(self):
timestamp = Timestamp(1, SPEC_VER, self.safe_expiry, snapshot_meta)
self.md_timestamp = Metadata(timestamp, OrderedDict())

roles = {role_name: Role([], 1) for role_name in TOP_LEVEL_ROLE_NAMES}
roles = {role_name.value: Role([], 1) for role_name in RoleNames}
root = Root(1, SPEC_VER, self.safe_expiry, {}, roles, True)

for role in TOP_LEVEL_ROLE_NAMES:
for role in RoleNames:
key, signer = self.create_key()
root.add_key(role, key)
self.add_signer(role, signer)
root.add_key(role.value, key)
self.add_signer(role.value, signer)

self.md_root = Metadata(root, OrderedDict())
self.publish_root()

def publish_root(self):
"""Sign and store a new serialized version of root."""
self.md_root.signatures.clear()
for signer in self.signers["root"].values():
for signer in self.signers[RoleNames.ROOT.value].values():
self.md_root.sign(signer, append=True)

self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
Expand All @@ -198,9 +199,9 @@ def fetch(self, url: str) -> Iterator[bytes]:
if path.startswith("/metadata/") and path.endswith(".json"):
ver_and_name = path[len("/metadata/") :][: -len(".json")]
# only consistent_snapshot supported ATM: timestamp is special case
if ver_and_name == "timestamp":
if ver_and_name == RoleNames.TIMESTAMP.value:
version = None
role = "timestamp"
role = RoleNames.TIMESTAMP.value
else:
version, _, role = ver_and_name.partition(".")
version = int(version)
Expand Down Expand Up @@ -242,19 +243,19 @@ def _fetch_metadata(
If version is None, non-versioned metadata is being requested.
"""
if role == "root":
if role == RoleNames.ROOT.value:
# return a version previously serialized in publish_root()
if version is None or version > len(self.signed_roots):
raise FetcherHTTPError(f"Unknown root version {version}", 404)
logger.debug("fetched root version %d", version)
return self.signed_roots[version - 1]

# sign and serialize the requested metadata
if role == "timestamp":
if role == RoleNames.TIMESTAMP.value:
md: Metadata = self.md_timestamp
elif role == "snapshot":
elif role == RoleNames.SNAPSHOT.value:
md = self.md_snapshot
elif role == "targets":
elif role == RoleNames.TARGETS.value:
md = self.md_targets
else:
md = self.md_delegates[role]
Expand Down Expand Up @@ -290,7 +291,7 @@ def update_timestamp(self):
self.timestamp.snapshot_meta.version = self.snapshot.version

if self.compute_metafile_hashes_length:
hashes, length = self._compute_hashes_and_length("snapshot")
hashes, length = self._compute_hashes_and_length(RoleNames.SNAPSHOT.value)
self.timestamp.snapshot_meta.hashes = hashes
self.timestamp.snapshot_meta.length = length

Expand All @@ -313,7 +314,7 @@ def update_snapshot(self):

def add_target(self, role: str, data: bytes, path: str):
"""Create a target from data and add it to the target_files."""
if role == "targets":
if role == RoleNames.TARGETS.value:
targets = self.targets
else:
targets = self.md_delegates[role].signed
Expand All @@ -332,7 +333,7 @@ def add_delegation(
hash_prefixes: Optional[List[str]],
):
"""Add delegated target role to the repository."""
if delegator_name == "targets":
if delegator_name == RoleNames.TARGETS.value:
delegator = self.targets
else:
delegator = self.md_delegates[delegator_name].signed
Expand Down Expand Up @@ -368,9 +369,9 @@ def write(self):

for ver in range(1, len(self.signed_roots) + 1):
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
f.write(self._fetch_metadata("root", ver))
f.write(self._fetch_metadata(RoleNames.ROOT.value, ver))

for role in ["timestamp", "snapshot", "targets"]:
for role in [RoleNames.TIMESTAMP.value, RoleNames.SNAPSHOT.value, RoleNames.TARGETS.value]:
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
f.write(self._fetch_metadata(role))

Expand Down
89 changes: 45 additions & 44 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from tuf.api.serialization import DeserializationError
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer
from tuf.api.metadata import RoleNames

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,7 +71,7 @@ def setUpClass(cls):

# Load keys into memory
cls.keystore = {}
for role in ["delegation", "snapshot", "targets", "timestamp"]:
for role in ["delegation", RoleNames.SNAPSHOT.value, RoleNames.TARGETS.value, RoleNames.TIMESTAMP.value]:
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + "_key"),
password="password",
Expand All @@ -84,10 +85,10 @@ def tearDownClass(cls):

def test_generic_read(self):
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
("timestamp", Timestamp),
("targets", Targets),
(RoleNames.ROOT.value, Root),
(RoleNames.SNAPSHOT.value, Snapshot),
(RoleNames.TIMESTAMP.value, Timestamp),
(RoleNames.TARGETS.value, Targets),
]:

# Load JSON-formatted metdata of each supported type from file
Expand Down Expand Up @@ -128,7 +129,7 @@ def test_compact_json(self):
)

def test_read_write_read_compare(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [RoleNames.ROOT.value, RoleNames.SNAPSHOT.value, RoleNames.TIMESTAMP.value, RoleNames.TARGETS.value]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)

Expand All @@ -140,7 +141,7 @@ def test_read_write_read_compare(self):
os.remove(path_2)

def test_to_from_bytes(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [RoleNames.ROOT.value, RoleNames.SNAPSHOT.value, RoleNames.TIMESTAMP.value, RoleNames.TARGETS.value]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
metadata_bytes = f.read()
Expand All @@ -161,11 +162,11 @@ def test_sign_verify(self):
root = Metadata[Root].from_file(root_path).signed

# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_keyid = next(iter(root.roles[RoleNames.TARGETS.value].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_keyid = next(iter(root.roles[RoleNames.SNAPSHOT.value].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[RoleNames.TIMESTAMP.value].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (targets) and assert ...
Expand All @@ -184,7 +185,7 @@ def test_sign_verify(self):
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer())

sslib_signer = SSlibSigner(self.keystore["snapshot"])
sslib_signer = SSlibSigner(self.keystore[RoleNames.SNAPSHOT.value])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
Expand All @@ -195,7 +196,7 @@ def test_sign_verify(self):
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)

sslib_signer = SSlibSigner(self.keystore["timestamp"])
sslib_signer = SSlibSigner(self.keystore[RoleNames.TIMESTAMP.value])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
Expand All @@ -210,7 +211,7 @@ def test_verify_failures(self):
root = Metadata[Root].from_file(root_path).signed

# Locate the timestamp public key we need from root
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[RoleNames.TIMESTAMP.value].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (timestamp)
Expand Down Expand Up @@ -361,20 +362,20 @@ def test_metadata_verify_delegate(self):
role2 = Metadata[Targets].from_file(role2_path)

# test the expected delegation tree
root.verify_delegate("root", root)
root.verify_delegate("snapshot", snapshot)
root.verify_delegate("targets", targets)
root.verify_delegate(RoleNames.ROOT.value, root)
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
root.verify_delegate(RoleNames.TARGETS.value, targets)
targets.verify_delegate("role1", role1)
role1.verify_delegate("role2", role2)

# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate("snapshot", snapshot)
snapshot.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate("role1", role1)
with self.assertRaises(ValueError):
targets.verify_delegate("targets", targets)
targets.verify_delegate(RoleNames.TARGETS.value, targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate("role1", role1)
Expand All @@ -383,31 +384,31 @@ def test_metadata_verify_delegate(self):
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
snapshot.signed.expires = expires

# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("timestamp", snapshot)
root.verify_delegate(RoleNames.TIMESTAMP.value, snapshot)

# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
ts_keyid = next(iter(root.signed.roles[RoleNames.TIMESTAMP.value].keyids))
root.signed.add_key(RoleNames.SNAPSHOT.value, root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)

# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)

# verify fails if threshold of signatures is not reached
root.signed.roles["snapshot"].threshold = 2
root.signed.roles[RoleNames.SNAPSHOT.value].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)

# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)
snapshot.sign(SSlibSigner(self.keystore[RoleNames.TIMESTAMP.value]), append=True)
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)

def test_key_class(self):
# Test if from_securesystemslib_key removes the private key from keyval
Expand All @@ -433,44 +434,44 @@ def test_root_add_key_and_remove_key(self):
)

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertNotIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key("root", key_metadata)
root.signed.add_key(RoleNames.ROOT.value, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
self.assertIn(keyid, root.signed.keys)

# Confirm that the newly added key does not break
# the object serialization
root.to_dict()

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles["root"].keyids.copy()
root.signed.add_key("root", key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
pre_add_keyid = root.signed.roles[RoleNames.ROOT.value].keyids.copy()
root.signed.add_key(RoleNames.ROOT.value, key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles[RoleNames.ROOT.value].keyids)

# Add the same key to targets role as well
root.signed.add_key("targets", key_metadata)
root.signed.add_key(RoleNames.TARGETS.value, key_metadata)

# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)

# Remove the key from root role (targets role still uses it)
root.signed.remove_key("root", keyid)
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
root.signed.remove_key(RoleNames.ROOT.value, keyid)
self.assertNotIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
self.assertIn(keyid, root.signed.keys)

# Remove the key from targets as well
root.signed.remove_key("targets", keyid)
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
root.signed.remove_key(RoleNames.TARGETS.value, keyid)
self.assertNotIn(keyid, root.signed.roles[RoleNames.TARGETS.value].keyids)
self.assertNotIn(keyid, root.signed.keys)

with self.assertRaises(ValueError):
root.signed.remove_key("root", "nosuchkey")
root.signed.remove_key(RoleNames.ROOT.value, "nosuchkey")
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)

Expand Down Expand Up @@ -661,7 +662,7 @@ def test_length_and_hash_validation(self):
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
file1_targetfile = targets.signed.targets["file1.txt"]
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
filepath = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")

with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)
Expand All @@ -679,7 +680,7 @@ def test_length_and_hash_validation(self):

def test_targetfile_from_file(self):
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ["sha256"]
)
Expand All @@ -688,20 +689,20 @@ def test_targetfile_from_file(self):
targetfile_from_file.verify_length_and_hashes(file)

# Test with a non-existing file
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
TargetFile.from_file(file_path, file_path, ["123"])

def test_targetfile_from_data(self):
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
target_file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")

# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(
Expand Down
Loading

0 comments on commit 3a7e025

Please sign in to comment.