From df35943da0c8b350e708137d42f82e4bb37ca7ce Mon Sep 17 00:00:00 2001 From: Ivana Atanasova Date: Fri, 12 Nov 2021 16:42:58 +0200 Subject: [PATCH] Create constants for top-level rolenames This is a change in the metadata API to remove hardcoded rolenames and use constants instead. Addresses #164 Signed-off-by: Ivana Atanasova --- tuf/api/metadata.py | 36 ++++++---- .../_internal/trusted_metadata_set.py | 65 +++++++++++-------- tuf/ngclient/updater.py | 28 ++++---- 3 files changed, 78 insertions(+), 51 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 25f14fe772..9fe78e5621 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -31,6 +31,7 @@ import tempfile from collections import OrderedDict from datetime import datetime, timedelta +from enum import Enum from typing import ( IO, Any, @@ -61,6 +62,18 @@ SignedSerializer, ) + +class RoleNames(Enum): + ROOT: str = "root" + SNAPSHOT: str = "snapshot" + TARGETS: str = "targets" + TIMESTAMP: str = "timestamp" + + @staticmethod + def all() -> set: + return set(map(lambda r: r.value, RoleNames)) + + # pylint: disable=too-many-lines logger = logging.getLogger(__name__) @@ -68,7 +81,6 @@ # We aim to support SPECIFICATION_VERSION and require the input metadata # files to have the same major version (the first number) as ours. SPECIFICATION_VERSION = ["1", "0", "19"] -TOP_LEVEL_ROLE_NAMES = {"root", "timestamp", "snapshot", "targets"} # T is a Generic type constraint for Metadata.signed T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") @@ -130,13 +142,13 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": # Dispatch to contained metadata class on metadata _type field. _type = metadata["signed"]["_type"] - if _type == "targets": + if _type == RoleNames.TARGETS.value: inner_cls: Type[Signed] = Targets - elif _type == "snapshot": + elif _type == RoleNames.SNAPSHOT.value: inner_cls = Snapshot - elif _type == "timestamp": + elif _type == RoleNames.TIMESTAMP.value: inner_cls = Timestamp - elif _type == "root": + elif _type == RoleNames.ROOT.value: inner_cls = Root else: raise ValueError(f'unrecognized metadata type "{_type}"') @@ -712,7 +724,7 @@ class Root(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - _signed_type = "root" + _signed_type = RoleNames.ROOT.value # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -729,7 +741,7 @@ def __init__( super().__init__(version, spec_version, expires, unrecognized_fields) self.consistent_snapshot = consistent_snapshot self.keys = keys - if set(roles) != TOP_LEVEL_ROLE_NAMES: + if set(roles) != RoleNames.all(): raise ValueError("Role names must be the top-level metadata roles") self.roles = roles @@ -965,7 +977,7 @@ class Timestamp(Signed): snapshot_meta: Meta information for snapshot metadata. """ - _signed_type = "timestamp" + _signed_type = RoleNames.TIMESTAMP.value def __init__( self, @@ -1015,7 +1027,7 @@ class Snapshot(Signed): meta: A dictionary of target metadata filenames to MetaFile objects. """ - _signed_type = "snapshot" + _signed_type = RoleNames.SNAPSHOT.value def __init__( self, @@ -1402,7 +1414,7 @@ class Targets(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - _signed_type = "targets" + _signed_type = RoleNames.TARGETS.value # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -1423,7 +1435,7 @@ def __init__( def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": """Creates Targets object from its dict representation.""" common_args = cls._common_fields_from_dict(signed_dict) - targets = signed_dict.pop("targets") + targets = signed_dict.pop(RoleNames.TARGETS.value) try: delegations_dict = signed_dict.pop("delegations") except KeyError: @@ -1444,7 +1456,7 @@ def to_dict(self) -> Dict[str, Any]: targets = {} for target_path, target_file_obj in self.targets.items(): targets[target_path] = target_file_obj.to_dict() - targets_dict["targets"] = targets + targets_dict[RoleNames.TARGETS.value] = targets if self.delegations is not None: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 59fe32a8f6..74405c3e6f 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -10,7 +10,7 @@ network IO, which are not handled here. Loaded metadata can be accessed via index access with rolename as key -(trusted_set["root"]) or, in the case of top-level metadata, using the helper +(trusted_set[RoleNames.ROOT.value]) or, in the case of top-level metadata, using the helper properties (trusted_set.root). The rules that TrustedMetadataSet follows for top-level metadata are @@ -35,7 +35,7 @@ >>> trusted_set = TrustedMetadataSet(f.read()) >>> >>> # update root from remote until no more are available ->>> with download("root", trusted_set.root.signed.version + 1) as f: +>>> with download(RoleNames.ROOT.value, trusted_set.root.signed.version + 1) as f: >>> trusted_set.update_root(f.read()) >>> >>> # load local timestamp, then update from remote @@ -45,7 +45,7 @@ >>> except (RepositoryError, OSError): >>> pass # failure to load a local file is ok >>> ->>> with download("timestamp") as f: +>>> with download(RoleNames.TIMESTAMP.value) as f: >>> trusted_set.update_timestamp(f.read()) >>> >>> # load local snapshot, then update from remote if needed @@ -55,7 +55,7 @@ >>> except (RepositoryError, OSError): >>> # local snapshot is not valid, load from remote >>> # (RepositoryErrors here stop the update) ->>> with download("snapshot", version) as f: +>>> with download(RoleNames.SNAPSHOT.value, version) as f: >>> trusted_set.update_snapshot(f.read()) TODO: @@ -73,7 +73,14 @@ from typing import Dict, Iterator, Optional from tuf import exceptions -from tuf.api.metadata import Metadata, Root, Snapshot, Targets, Timestamp +from tuf.api.metadata import ( + Metadata, + RoleNames, + Root, + Snapshot, + Targets, + Timestamp, +) from tuf.api.serialization import DeserializationError logger = logging.getLogger(__name__) @@ -123,22 +130,22 @@ def __iter__(self) -> Iterator[Metadata]: @property def root(self) -> Metadata[Root]: """Current root Metadata""" - return self._trusted_set["root"] + return self._trusted_set[RoleNames.ROOT.value] @property def timestamp(self) -> Optional[Metadata[Timestamp]]: """Current timestamp Metadata or None""" - return self._trusted_set.get("timestamp") + return self._trusted_set.get(RoleNames.TIMESTAMP.value) @property def snapshot(self) -> Optional[Metadata[Snapshot]]: """Current snapshot Metadata or None""" - return self._trusted_set.get("snapshot") + return self._trusted_set.get(RoleNames.SNAPSHOT.value) @property def targets(self) -> Optional[Metadata[Targets]]: """Current targets Metadata or None""" - return self._trusted_set.get("targets") + return self._trusted_set.get(RoleNames.TARGETS.value) # Methods for updating metadata def update_root(self, data: bytes) -> None: @@ -163,23 +170,25 @@ def update_root(self, data: bytes) -> None: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != "root": + if new_root.signed.type != RoleNames.ROOT.value: raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) # Verify that new root is signed by trusted root - self.root.verify_delegate("root", new_root) + self.root.verify_delegate(RoleNames.ROOT.value, new_root) if new_root.signed.version != self.root.signed.version + 1: raise exceptions.ReplayedMetadataError( - "root", new_root.signed.version, self.root.signed.version + RoleNames.ROOT.value, + new_root.signed.version, + self.root.signed.version, ) # Verify that new root is signed by itself - new_root.verify_delegate("root", new_root) + new_root.verify_delegate(RoleNames.ROOT.value, new_root) - self._trusted_set["root"] = new_root + self._trusted_set[RoleNames.ROOT.value] = new_root logger.info("Updated root v%d", new_root.signed.version) def update_timestamp(self, data: bytes) -> None: @@ -214,12 +223,12 @@ def update_timestamp(self, data: bytes) -> None: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load timestamp") from e - if new_timestamp.signed.type != "timestamp": + if new_timestamp.signed.type != RoleNames.TIMESTAMP.value: raise exceptions.RepositoryError( f"Expected 'timestamp', got '{new_timestamp.signed.type}'" ) - self.root.verify_delegate("timestamp", new_timestamp) + self.root.verify_delegate(RoleNames.TIMESTAMP.value, new_timestamp) # If an existing trusted timestamp is updated, # check for a rollback attack @@ -227,7 +236,7 @@ def update_timestamp(self, data: bytes) -> None: # Prevent rolling back timestamp version if new_timestamp.signed.version < self.timestamp.signed.version: raise exceptions.ReplayedMetadataError( - "timestamp", + RoleNames.TIMESTAMP.value, new_timestamp.signed.version, self.timestamp.signed.version, ) @@ -237,7 +246,7 @@ def update_timestamp(self, data: bytes) -> None: < self.timestamp.signed.snapshot_meta.version ): raise exceptions.ReplayedMetadataError( - "snapshot", + RoleNames.SNAPSHOT.value, new_timestamp.signed.snapshot_meta.version, self.timestamp.signed.snapshot_meta.version, ) @@ -245,7 +254,7 @@ def update_timestamp(self, data: bytes) -> None: # expiry not checked to allow old timestamp to be used for rollback # protection of new timestamp: expiry is checked in update_snapshot() - self._trusted_set["timestamp"] = new_timestamp + self._trusted_set[RoleNames.TIMESTAMP.value] = new_timestamp logger.info("Updated timestamp v%d", new_timestamp.signed.version) # timestamp is loaded: raise if it is not valid _final_ timestamp @@ -310,12 +319,12 @@ def update_snapshot( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_snapshot.signed.type != "snapshot": + if new_snapshot.signed.type != RoleNames.SNAPSHOT.value: raise exceptions.RepositoryError( f"Expected 'snapshot', got '{new_snapshot.signed.type}'" ) - self.root.verify_delegate("snapshot", new_snapshot) + self.root.verify_delegate(RoleNames.SNAPSHOT.value, new_snapshot) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated @@ -341,7 +350,7 @@ def update_snapshot( # expiry not checked to allow old snapshot to be used for rollback # protection of new snapshot: it is checked when targets is updated - self._trusted_set["snapshot"] = new_snapshot + self._trusted_set[RoleNames.SNAPSHOT.value] = new_snapshot logger.info("Updated snapshot v%d", new_snapshot.signed.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot @@ -371,7 +380,9 @@ def update_targets(self, data: bytes) -> None: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - self.update_delegated_targets(data, "targets", "root") + self.update_delegated_targets( + data, RoleNames.TARGETS.value, RoleNames.ROOT.value + ) def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str @@ -419,7 +430,7 @@ def update_delegated_targets( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_delegate.signed.type != "targets": + if new_delegate.signed.type != RoleNames.TARGETS.value: raise exceptions.RepositoryError( f"Expected 'targets', got '{new_delegate.signed.type}'" ) @@ -449,12 +460,12 @@ def _load_trusted_root(self, data: bytes) -> None: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != "root": + if new_root.signed.type != RoleNames.ROOT.value: raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) - new_root.verify_delegate("root", new_root) + new_root.verify_delegate(RoleNames.ROOT.value, new_root) - self._trusted_set["root"] = new_root + self._trusted_set[RoleNames.ROOT.value] = new_root logger.info("Loaded trusted root v%d", new_root.signed.version) diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 5857be13d4..1edc54a862 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -72,7 +72,7 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import TargetFile, Targets +from tuf.api.metadata import RoleNames, TargetFile, Targets from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -118,7 +118,7 @@ def __init__( self._target_base_url = _ensure_trailing_slash(target_base_url) # Read trusted local root metadata - data = self._load_local_metadata("root") + data = self._load_local_metadata(RoleNames.ROOT.value) self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() @@ -145,7 +145,7 @@ def refresh(self) -> None: self._load_root() self._load_timestamp() self._load_snapshot() - self._load_targets("targets", "root") + self._load_targets(RoleNames.TARGETS.value, RoleNames.ROOT.value) def _generate_target_file_path(self, targetinfo: TargetFile) -> str: if self.target_dir is None: @@ -320,10 +320,12 @@ def _load_root(self) -> None: for next_version in range(lower_bound, upper_bound): try: data = self._download_metadata( - "root", self.config.root_max_length, next_version + RoleNames.ROOT.value, + self.config.root_max_length, + next_version, ) self._trusted_set.update_root(data) - self._persist_metadata("root", data) + self._persist_metadata(RoleNames.ROOT.value, data) except exceptions.FetcherHTTPError as exception: if exception.status_code not in {403, 404}: @@ -334,7 +336,7 @@ def _load_root(self) -> None: def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: - data = self._load_local_metadata("timestamp") + data = self._load_local_metadata(RoleNames.TIMESTAMP.value) self._trusted_set.update_timestamp(data) except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid @@ -342,15 +344,15 @@ def _load_timestamp(self) -> None: # Load from remote (whether local load succeeded or not) data = self._download_metadata( - "timestamp", self.config.timestamp_max_length + RoleNames.TIMESTAMP.value, self.config.timestamp_max_length ) self._trusted_set.update_timestamp(data) - self._persist_metadata("timestamp", data) + self._persist_metadata(RoleNames.TIMESTAMP.value, data) def _load_snapshot(self) -> None: """Load local (and if needed remote) snapshot metadata""" try: - data = self._load_local_metadata("snapshot") + data = self._load_local_metadata(RoleNames.SNAPSHOT.value) self._trusted_set.update_snapshot(data, trusted=True) logger.debug("Local snapshot is valid: not downloading new one") except (OSError, exceptions.RepositoryError) as e: @@ -364,9 +366,11 @@ def _load_snapshot(self) -> None: if self._trusted_set.root.signed.consistent_snapshot: version = snapshot_meta.version - data = self._download_metadata("snapshot", length, version) + data = self._download_metadata( + RoleNames.SNAPSHOT.value, length, version + ) self._trusted_set.update_snapshot(data) - self._persist_metadata("snapshot", data) + self._persist_metadata(RoleNames.SNAPSHOT.value, data) def _load_targets(self, role: str, parent_role: str) -> None: """Load local (and if needed remote) metadata for 'role'.""" @@ -400,7 +404,7 @@ def _preorder_depth_first_walk( # List of delegations to be interrogated. A (role, parent role) pair # is needed to load and verify the delegated targets metadata. - delegations_to_visit = [("targets", "root")] + delegations_to_visit = [(RoleNames.TARGETS.value, RoleNames.ROOT.value)] visited_role_names: Set[Tuple[str, str]] = set() number_of_delegations = self.config.max_delegations