Skip to content

Commit

Permalink
Apply top-level rolenames constants in tests
Browse files Browse the repository at this point in the history
This applies the use of constants of top-level rolenames in the
tests instead of the previously hardcoded strings.
Addresses #1648

Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
  • Loading branch information
Ivana Atanasova committed Nov 12, 2021
1 parent 7f094c5 commit 010dcc8
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 196 deletions.
27 changes: 14 additions & 13 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
from tuf.api.metadata import Rolename
from tuf.exceptions import FetcherHTTPError, RepositoryError
from tuf.ngclient.fetcher import FetcherInterface

Expand Down Expand Up @@ -136,7 +137,7 @@ def targets(self) -> Targets:
return self.md_targets.signed

def all_targets(self) -> Iterator[Tuple[str, Targets]]:
yield "targets", self.md_targets.signed
yield Rolename.TARGETS, self.md_targets.signed
for role, md in self.md_delegates.items():
yield role, md.signed

Expand Down Expand Up @@ -178,7 +179,7 @@ def _initialize(self):
def publish_root(self):
"""Sign and store a new serialized version of root"""
self.md_root.signatures.clear()
for signer in self.signers["root"].values():
for signer in self.signers[Rolename.ROOT].values():
self.md_root.sign(signer, append=True)

self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
Expand All @@ -191,9 +192,9 @@ def fetch(self, url: str) -> Iterator[bytes]:
if path.startswith("/metadata/") and path.endswith(".json"):
ver_and_name = path[len("/metadata/") :][: -len(".json")]
# only consistent_snapshot supported ATM: timestamp is special case
if ver_and_name == "timestamp":
if ver_and_name == Rolename.TIMESTAMP:
version = None
role = "timestamp"
role = Rolename.TIMESTAMP
else:
version, _, role = ver_and_name.partition(".")
version = int(version)
Expand Down Expand Up @@ -230,19 +231,19 @@ def _fetch_metadata(
If version is None, non-versioned metadata is being requested
"""
if role == "root":
if role == Rolename.ROOT:
# return a version previously serialized in publish_root()
if version is None or version > len(self.signed_roots):
raise FetcherHTTPError(f"Unknown root version {version}", 404)
logger.debug("fetched root version %d", role, version)
return self.signed_roots[version - 1]
else:
# sign and serialize the requested metadata
if role == "timestamp":
if role == Rolename.TIMESTAMP:
md: Metadata = self.md_timestamp
elif role == "snapshot":
elif role == Rolename.SNAPSHOT:
md = self.md_snapshot
elif role == "targets":
elif role == Rolename.TARGETS:
md = self.md_targets
else:
md = self.md_delegates[role]
Expand Down Expand Up @@ -275,7 +276,7 @@ def update_timestamp(self):
self.timestamp.snapshot_meta.version = self.snapshot.version

if self.compute_metafile_hashes_length:
hashes, length = self._compute_hashes_and_length("snapshot")
hashes, length = self._compute_hashes_and_length(Rolename.SNAPSHOT)
self.timestamp.snapshot_meta.hashes = hashes
self.timestamp.snapshot_meta.length = length

Expand All @@ -296,7 +297,7 @@ def update_snapshot(self):
self.update_timestamp()

def add_target(self, role: str, data: bytes, path: str):
if role == "targets":
if role == Rolename.TARGETS:
targets = self.targets
else:
targets = self.md_delegates[role].signed
Expand All @@ -314,7 +315,7 @@ def add_delegation(
paths: Optional[List[str]],
hash_prefixes: Optional[List[str]],
):
if delegator_name == "targets":
if delegator_name == Rolename.TARGETS:
delegator = self.targets
else:
delegator = self.md_delegates[delegator_name].signed
Expand Down Expand Up @@ -350,9 +351,9 @@ def write(self):

for ver in range(1, len(self.signed_roots) + 1):
with open(os.path.join(dir, f"{ver}.root.json"), "wb") as f:
f.write(self._fetch_metadata("root", ver))
f.write(self._fetch_metadata(Rolename.ROOT, ver))

for role in ["timestamp", "snapshot", "targets"]:
for role in [Rolename.TIMESTAMP, Rolename.SNAPSHOT, Rolename.TARGETS]:
with open(os.path.join(dir, f"{role}.json"), "wb") as f:
f.write(self._fetch_metadata(role))

Expand Down
89 changes: 45 additions & 44 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from tuf.api.serialization import DeserializationError
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer
from tuf.api.metadata import Rolename

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,7 +69,7 @@ def setUpClass(cls):

# Load keys into memory
cls.keystore = {}
for role in ["delegation", "snapshot", "targets", "timestamp"]:
for role in ["delegation", Rolename.SNAPSHOT, Rolename.TARGETS, Rolename.TIMESTAMP]:
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + "_key"),
password="password",
Expand All @@ -82,10 +83,10 @@ def tearDownClass(cls):

def test_generic_read(self):
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
("timestamp", Timestamp),
("targets", Targets),
(Rolename.ROOT, Root),
(Rolename.SNAPSHOT, Snapshot),
(Rolename.TIMESTAMP, Timestamp),
(Rolename.TARGETS, Targets),
]:

# Load JSON-formatted metdata of each supported type from file
Expand Down Expand Up @@ -126,7 +127,7 @@ def test_compact_json(self):
)

def test_read_write_read_compare(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Rolename.ROOT, Rolename.SNAPSHOT, Rolename.TIMESTAMP, Rolename.TARGETS]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)

Expand All @@ -138,7 +139,7 @@ def test_read_write_read_compare(self):
os.remove(path_2)

def test_to_from_bytes(self):
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Rolename.ROOT, Rolename.SNAPSHOT, Rolename.TIMESTAMP, Rolename.TARGETS]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
metadata_bytes = f.read()
Expand All @@ -159,11 +160,11 @@ def test_sign_verify(self):
root = Metadata[Root].from_file(root_path).signed

# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_keyid = next(iter(root.roles[Rolename.TARGETS].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_keyid = next(iter(root.roles[Rolename.SNAPSHOT].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Rolename.TIMESTAMP].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (targets) and assert ...
Expand All @@ -182,7 +183,7 @@ def test_sign_verify(self):
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer())

sslib_signer = SSlibSigner(self.keystore["snapshot"])
sslib_signer = SSlibSigner(self.keystore[Rolename.SNAPSHOT])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
Expand All @@ -193,7 +194,7 @@ def test_sign_verify(self):
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)

sslib_signer = SSlibSigner(self.keystore["timestamp"])
sslib_signer = SSlibSigner(self.keystore[Rolename.TIMESTAMP])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
Expand All @@ -208,7 +209,7 @@ def test_verify_failures(self):
root = Metadata[Root].from_file(root_path).signed

# Locate the timestamp public key we need from root
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Rolename.TIMESTAMP].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (timestamp)
Expand Down Expand Up @@ -359,20 +360,20 @@ def test_metadata_verify_delegate(self):
role2 = Metadata[Targets].from_file(role2_path)

# test the expected delegation tree
root.verify_delegate("root", root)
root.verify_delegate("snapshot", snapshot)
root.verify_delegate("targets", targets)
root.verify_delegate(Rolename.ROOT, root)
root.verify_delegate(Rolename.SNAPSHOT, snapshot)
root.verify_delegate(Rolename.TARGETS, targets)
targets.verify_delegate("role1", role1)
role1.verify_delegate("role2", role2)

# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate("snapshot", snapshot)
snapshot.verify_delegate(Rolename.SNAPSHOT, snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate("role1", role1)
with self.assertRaises(ValueError):
targets.verify_delegate("targets", targets)
targets.verify_delegate(Rolename.TARGETS, targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate("role1", role1)
Expand All @@ -381,31 +382,31 @@ def test_metadata_verify_delegate(self):
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Rolename.SNAPSHOT, snapshot)
snapshot.signed.expires = expires

# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("timestamp", snapshot)
root.verify_delegate(Rolename.TIMESTAMP, snapshot)

# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
ts_keyid = next(iter(root.signed.roles[Rolename.TIMESTAMP].keyids))
root.signed.add_key(Rolename.SNAPSHOT, root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)

# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Rolename.SNAPSHOT, snapshot)

# verify fails if threshold of signatures is not reached
root.signed.roles["snapshot"].threshold = 2
root.signed.roles[Rolename.SNAPSHOT].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Rolename.SNAPSHOT, snapshot)

# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)
snapshot.sign(SSlibSigner(self.keystore[Rolename.TIMESTAMP]), append=True)
root.verify_delegate(Rolename.SNAPSHOT, snapshot)

def test_key_class(self):
# Test if from_securesystemslib_key removes the private key from keyval
Expand All @@ -431,44 +432,44 @@ def test_root_add_key_and_remove_key(self):
)

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertNotIn(keyid, root.signed.roles[Rolename.ROOT].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key("root", key_metadata)
root.signed.add_key(Rolename.ROOT, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.roles[Rolename.ROOT].keyids)
self.assertIn(keyid, root.signed.keys)

# Confirm that the newly added key does not break
# the object serialization
root.to_dict()

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles["root"].keyids.copy()
root.signed.add_key("root", key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
pre_add_keyid = root.signed.roles[Rolename.ROOT].keyids.copy()
root.signed.add_key(Rolename.ROOT, key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles[Rolename.ROOT].keyids)

# Add the same key to targets role as well
root.signed.add_key("targets", key_metadata)
root.signed.add_key(Rolename.TARGETS, key_metadata)

# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)

# Remove the key from root role (targets role still uses it)
root.signed.remove_key("root", keyid)
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
root.signed.remove_key(Rolename.ROOT, keyid)
self.assertNotIn(keyid, root.signed.roles[Rolename.ROOT].keyids)
self.assertIn(keyid, root.signed.keys)

# Remove the key from targets as well
root.signed.remove_key("targets", keyid)
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
root.signed.remove_key(Rolename.TARGETS, keyid)
self.assertNotIn(keyid, root.signed.roles[Rolename.TARGETS].keyids)
self.assertNotIn(keyid, root.signed.keys)

with self.assertRaises(ValueError):
root.signed.remove_key("root", "nosuchkey")
root.signed.remove_key(Rolename.ROOT, "nosuchkey")
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)

Expand Down Expand Up @@ -670,7 +671,7 @@ def test_length_and_hash_validation(self):
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
file1_targetfile = targets.signed.targets["file1.txt"]
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
filepath = os.path.join(self.repo_dir, Rolename.TARGETS, "file1.txt")

with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)
Expand All @@ -694,7 +695,7 @@ def test_length_and_hash_validation(self):

def test_targetfile_from_file(self):
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Rolename.TARGETS, "file1.txt")
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ["sha256"]
)
Expand All @@ -703,7 +704,7 @@ def test_targetfile_from_file(self):
targetfile_from_file.verify_length_and_hashes(file)

# Test with a non-existing file
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
file_path = os.path.join(self.repo_dir, Rolename.TARGETS, "file123.txt")
self.assertRaises(
FileNotFoundError,
TargetFile.from_file,
Expand All @@ -713,7 +714,7 @@ def test_targetfile_from_file(self):
)

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Rolename.TARGETS, "file1.txt")
self.assertRaises(
exceptions.UnsupportedAlgorithmError,
TargetFile.from_file,
Expand All @@ -724,7 +725,7 @@ def test_targetfile_from_file(self):

def test_targetfile_from_data(self):
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
target_file_path = os.path.join(self.repo_dir, Rolename.TARGETS, "file1.txt")

# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tuf.roledb
import tuf.keydb
import tuf.repository_tool as repo_tool
from tuf.api.metadata import Rolename

from tests import utils

Expand Down Expand Up @@ -722,7 +723,7 @@ def test_signature_order(self):
"password"))

# Write root metadata with two signatures
repo.write("root")
repo.write(Rolename.ROOT)

# Load signed and written json metadata back into memory
root_metadata_path = os.path.join(
Expand Down
Loading

0 comments on commit 010dcc8

Please sign in to comment.