From b992bb9af4c96b7270cf23804567a40c77227c95 Mon Sep 17 00:00:00 2001 From: Ivana Atanasova Date: Wed, 17 Nov 2021 14:23:03 +0200 Subject: [PATCH] Create constants for top-level rolenames This is a change in the metadata API to remove hardcoded rolenames and use constants instead. Fixes #1648 Signed-off-by: Ivana Atanasova --- tuf/api/metadata.py | 42 +++++++-------- .../_internal/trusted_metadata_set.py | 54 ++++++++++--------- tuf/ngclient/updater.py | 26 ++++----- 3 files changed, 63 insertions(+), 59 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 567e3349a2..698d044339 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -61,6 +61,11 @@ SignedSerializer, ) +_ROOT: str = "root" +_SNAPSHOT: str = "snapshot" +_TARGETS: str = "targets" +_TIMESTAMP: str = "timestamp" + # pylint: disable=too-many-lines logger = logging.getLogger(__name__) @@ -68,7 +73,7 @@ # We aim to support SPECIFICATION_VERSION and require the input metadata # files to have the same major version (the first number) as ours. SPECIFICATION_VERSION = ["1", "0", "19"] -TOP_LEVEL_ROLE_NAMES = {"root", "timestamp", "snapshot", "targets"} +TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} # T is a Generic type constraint for Metadata.signed T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") @@ -130,13 +135,13 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": # Dispatch to contained metadata class on metadata _type field. _type = metadata["signed"]["_type"] - if _type == "targets": + if _type == _TARGETS: inner_cls: Type[Signed] = Targets - elif _type == "snapshot": + elif _type == _SNAPSHOT: inner_cls = Snapshot - elif _type == "timestamp": + elif _type == _TIMESTAMP: inner_cls = Timestamp - elif _type == "root": + elif _type == _ROOT: inner_cls = Root else: raise ValueError(f'unrecognized metadata type "{_type}"') @@ -394,18 +399,13 @@ class Signed(metaclass=abc.ABCMeta): unrecognized_fields: Dictionary of all unrecognized fields. """ - # Signed implementations are expected to override this - _signed_type: ClassVar[str] = "signed" + # type is required for static reference without changing the API + type: ClassVar[str] = "signed" # _type and type are identical: 1st replicates file format, 2nd passes lint @property def _type(self) -> str: - return self._signed_type - - @property - def type(self) -> str: - """Metadata type as string.""" - return self._signed_type + return self.type # NOTE: Signed is a stupid name, because this might not be signed yet, but # we keep it to match spec terminology (I often refer to this as "payload", @@ -458,8 +458,8 @@ def _common_fields_from_dict( """ _type = signed_dict.pop("_type") - if _type != cls._signed_type: - raise ValueError(f"Expected type {cls._signed_type}, got {_type}") + if _type != cls.type: + raise ValueError(f"Expected type {cls.type}, got {_type}") version = signed_dict.pop("version") spec_version = signed_dict.pop("spec_version") @@ -712,7 +712,7 @@ class Root(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - _signed_type = "root" + type = _ROOT # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -965,7 +965,7 @@ class Timestamp(Signed): snapshot_meta: Meta information for snapshot metadata. """ - _signed_type = "timestamp" + type = _TIMESTAMP def __init__( self, @@ -1015,7 +1015,7 @@ class Snapshot(Signed): meta: A dictionary of target metadata filenames to MetaFile objects. """ - _signed_type = "snapshot" + type = _SNAPSHOT def __init__( self, @@ -1409,7 +1409,7 @@ class Targets(Signed): unrecognized_fields: Dictionary of all unrecognized fields. """ - _signed_type = "targets" + type = _TARGETS # TODO: determine an appropriate value for max-args # pylint: disable=too-many-arguments @@ -1430,7 +1430,7 @@ def __init__( def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": """Creates Targets object from its dict representation.""" common_args = cls._common_fields_from_dict(signed_dict) - targets = signed_dict.pop("targets") + targets = signed_dict.pop(_TARGETS) try: delegations_dict = signed_dict.pop("delegations") except KeyError: @@ -1451,7 +1451,7 @@ def to_dict(self) -> Dict[str, Any]: targets = {} for target_path, target_file_obj in self.targets.items(): targets[target_path] = target_file_obj.to_dict() - targets_dict["targets"] = targets + targets_dict[_TARGETS] = targets if self.delegations is not None: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index b7c831158c..e502609cd0 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -10,7 +10,7 @@ network IO, which are not handled here. Loaded metadata can be accessed via index access with rolename as key -(trusted_set["root"]) or, in the case of top-level metadata, using the helper +(trusted_set[Root.type]) or, in the case of top-level metadata, using the helper properties (trusted_set.root). The rules that TrustedMetadataSet follows for top-level metadata are @@ -35,7 +35,7 @@ >>> trusted_set = TrustedMetadataSet(f.read()) >>> >>> # update root from remote until no more are available ->>> with download("root", trusted_set.root.signed.version + 1) as f: +>>> with download(Root.type, trusted_set.root.signed.version + 1) as f: >>> trusted_set.update_root(f.read()) >>> >>> # load local timestamp, then update from remote @@ -45,7 +45,7 @@ >>> except (RepositoryError, OSError): >>> pass # failure to load a local file is ok >>> ->>> with download("timestamp") as f: +>>> with download(Timestamp.type) as f: >>> trusted_set.update_timestamp(f.read()) >>> >>> # load local snapshot, then update from remote if needed @@ -55,7 +55,7 @@ >>> except (RepositoryError, OSError): >>> # local snapshot is not valid, load from remote >>> # (RepositoryErrors here stop the update) ->>> with download("snapshot", version) as f: +>>> with download(Snapshot.type, version) as f: >>> trusted_set.update_snapshot(f.read()) TODO: @@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]: @property def root(self) -> Metadata[Root]: """Current root Metadata""" - return self._trusted_set["root"] + return self._trusted_set[Root.type] @property def timestamp(self) -> Optional[Metadata[Timestamp]]: """Current timestamp Metadata or None""" - return self._trusted_set.get("timestamp") + return self._trusted_set.get(Timestamp.type) @property def snapshot(self) -> Optional[Metadata[Snapshot]]: """Current snapshot Metadata or None""" - return self._trusted_set.get("snapshot") + return self._trusted_set.get(Snapshot.type) @property def targets(self) -> Optional[Metadata[Targets]]: """Current targets Metadata or None""" - return self._trusted_set.get("targets") + return self._trusted_set.get(Targets.type) # Methods for updating metadata def update_root(self, data: bytes) -> Metadata[Root]: @@ -166,23 +166,25 @@ def update_root(self, data: bytes) -> Metadata[Root]: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != "root": + if new_root.signed.type != Root.type: raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) # Verify that new root is signed by trusted root - self.root.verify_delegate("root", new_root) + self.root.verify_delegate(Root.type, new_root) if new_root.signed.version != self.root.signed.version + 1: raise exceptions.ReplayedMetadataError( - "root", new_root.signed.version, self.root.signed.version + Root.type, + new_root.signed.version, + self.root.signed.version, ) # Verify that new root is signed by itself - new_root.verify_delegate("root", new_root) + new_root.verify_delegate(Root.type, new_root) - self._trusted_set["root"] = new_root + self._trusted_set[Root.type] = new_root logger.info("Updated root v%d", new_root.signed.version) return new_root @@ -222,12 +224,12 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load timestamp") from e - if new_timestamp.signed.type != "timestamp": + if new_timestamp.signed.type != Timestamp.type: raise exceptions.RepositoryError( f"Expected 'timestamp', got '{new_timestamp.signed.type}'" ) - self.root.verify_delegate("timestamp", new_timestamp) + self.root.verify_delegate(Timestamp.type, new_timestamp) # If an existing trusted timestamp is updated, # check for a rollback attack @@ -235,7 +237,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: # Prevent rolling back timestamp version if new_timestamp.signed.version < self.timestamp.signed.version: raise exceptions.ReplayedMetadataError( - "timestamp", + Timestamp.type, new_timestamp.signed.version, self.timestamp.signed.version, ) @@ -245,7 +247,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: < self.timestamp.signed.snapshot_meta.version ): raise exceptions.ReplayedMetadataError( - "snapshot", + Snapshot.type, new_timestamp.signed.snapshot_meta.version, self.timestamp.signed.snapshot_meta.version, ) @@ -253,7 +255,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: # expiry not checked to allow old timestamp to be used for rollback # protection of new timestamp: expiry is checked in update_snapshot() - self._trusted_set["timestamp"] = new_timestamp + self._trusted_set[Timestamp.type] = new_timestamp logger.info("Updated timestamp v%d", new_timestamp.signed.version) # timestamp is loaded: raise if it is not valid _final_ timestamp @@ -323,12 +325,12 @@ def update_snapshot( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_snapshot.signed.type != "snapshot": + if new_snapshot.signed.type != Snapshot.type: raise exceptions.RepositoryError( f"Expected 'snapshot', got '{new_snapshot.signed.type}'" ) - self.root.verify_delegate("snapshot", new_snapshot) + self.root.verify_delegate(Snapshot.type, new_snapshot) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated @@ -354,7 +356,7 @@ def update_snapshot( # expiry not checked to allow old snapshot to be used for rollback # protection of new snapshot: it is checked when targets is updated - self._trusted_set["snapshot"] = new_snapshot + self._trusted_set[Snapshot.type] = new_snapshot logger.info("Updated snapshot v%d", new_snapshot.signed.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot @@ -389,7 +391,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]: Returns: Deserialized and verified targets Metadata object """ - return self.update_delegated_targets(data, "targets", "root") + return self.update_delegated_targets(data, Targets.type, Root.type) def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str @@ -440,7 +442,7 @@ def update_delegated_targets( except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e - if new_delegate.signed.type != "targets": + if new_delegate.signed.type != Targets.type: raise exceptions.RepositoryError( f"Expected 'targets', got '{new_delegate.signed.type}'" ) @@ -472,12 +474,12 @@ def _load_trusted_root(self, data: bytes) -> None: except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e - if new_root.signed.type != "root": + if new_root.signed.type != Root.type: raise exceptions.RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) - new_root.verify_delegate("root", new_root) + new_root.verify_delegate(Root.type, new_root) - self._trusted_set["root"] = new_root + self._trusted_set[Root.type] = new_root logger.info("Loaded trusted root v%d", new_root.signed.version) diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 649a2b4bc3..23675045fe 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -68,7 +68,7 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import Metadata, TargetFile, Targets +from tuf.api.metadata import Metadata, Root, Snapshot, TargetFile, Targets, Timestamp from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -114,7 +114,7 @@ def __init__( self._target_base_url = _ensure_trailing_slash(target_base_url) # Read trusted local root metadata - data = self._load_local_metadata("root") + data = self._load_local_metadata(Root.type) self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() @@ -146,7 +146,7 @@ def refresh(self) -> None: self._load_root() self._load_timestamp() self._load_snapshot() - self._load_targets("targets", "root") + self._load_targets(Targets.type, Root.type) def _generate_target_file_path(self, targetinfo: TargetFile) -> str: if self.target_dir is None: @@ -320,10 +320,12 @@ def _load_root(self) -> None: for next_version in range(lower_bound, upper_bound): try: data = self._download_metadata( - "root", self.config.root_max_length, next_version + Root.type, + self.config.root_max_length, + next_version, ) self._trusted_set.update_root(data) - self._persist_metadata("root", data) + self._persist_metadata(Root.type, data) except exceptions.FetcherHTTPError as exception: if exception.status_code not in {403, 404}: @@ -334,7 +336,7 @@ def _load_root(self) -> None: def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: - data = self._load_local_metadata("timestamp") + data = self._load_local_metadata(Timestamp.type) self._trusted_set.update_timestamp(data) except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid @@ -342,15 +344,15 @@ def _load_timestamp(self) -> None: # Load from remote (whether local load succeeded or not) data = self._download_metadata( - "timestamp", self.config.timestamp_max_length + Timestamp.type, self.config.timestamp_max_length ) self._trusted_set.update_timestamp(data) - self._persist_metadata("timestamp", data) + self._persist_metadata(Timestamp.type, data) def _load_snapshot(self) -> None: """Load local (and if needed remote) snapshot metadata""" try: - data = self._load_local_metadata("snapshot") + data = self._load_local_metadata(Snapshot.type) self._trusted_set.update_snapshot(data, trusted=True) logger.debug("Local snapshot is valid: not downloading new one") except (OSError, exceptions.RepositoryError) as e: @@ -364,9 +366,9 @@ def _load_snapshot(self) -> None: if self._trusted_set.root.signed.consistent_snapshot: version = snapshot_meta.version - data = self._download_metadata("snapshot", length, version) + data = self._download_metadata(Snapshot.type, length, version) self._trusted_set.update_snapshot(data) - self._persist_metadata("snapshot", data) + self._persist_metadata(Snapshot.type, data) def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: """Load local (and if needed remote) metadata for 'role'.""" @@ -412,7 +414,7 @@ def _preorder_depth_first_walk( # List of delegations to be interrogated. A (role, parent role) pair # is needed to load and verify the delegated targets metadata. - delegations_to_visit = [("targets", "root")] + delegations_to_visit = [(Targets.type, Root.type)] visited_role_names: Set[str] = set() number_of_delegations = self.config.max_delegations