From ce14e937c59b56a5a41e02b41c196c990bbc2412 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 22 Jun 2021 16:01:13 +0300 Subject: [PATCH] Rename MetadataBundle to TrustedMetadataSet TrustedMetadataSet is a long name but * it better describes the main feature * the name isn't used in too many places Change the variable names "bundle" -> "trusted_set" Signed-off-by: Jussi Kukkonen --- ...bundle.py => test_trusted_metadata_set.py} | 56 ++++++++-------- ...data_bundle.py => trusted_metadata_set.py} | 66 +++++++++---------- tuf/ngclient/updater.py | 36 +++++----- 3 files changed, 81 insertions(+), 77 deletions(-) rename tests/{test_metadata_bundle.py => test_trusted_metadata_set.py} (65%) rename tuf/ngclient/_internal/{metadata_bundle.py => trusted_metadata_set.py} (89%) diff --git a/tests/test_metadata_bundle.py b/tests/test_trusted_metadata_set.py similarity index 65% rename from tests/test_metadata_bundle.py rename to tests/test_trusted_metadata_set.py index a988b8d3f5..b59e9de78b 100644 --- a/tests/test_metadata_bundle.py +++ b/tests/test_trusted_metadata_set.py @@ -8,31 +8,31 @@ from tuf import exceptions from tuf.api.metadata import Metadata -from tuf.ngclient._internal.metadata_bundle import MetadataBundle +from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from tests import utils logger = logging.getLogger(__name__) -class TestMetadataBundle(unittest.TestCase): +class TestTrustedMetadataSet(unittest.TestCase): def test_update(self): repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') with open(os.path.join(repo_dir, "root.json"), "rb") as f: - bundle = MetadataBundle(f.read()) - bundle.root_update_finished() + trusted_set = TrustedMetadataSet(f.read()) + trusted_set.root_update_finished() with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f: - bundle.update_timestamp(f.read()) + trusted_set.update_timestamp(f.read()) with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f: - bundle.update_snapshot(f.read()) + trusted_set.update_snapshot(f.read()) with open(os.path.join(repo_dir, "targets.json"), "rb") as f: - bundle.update_targets(f.read()) + trusted_set.update_targets(f.read()) with open(os.path.join(repo_dir, "role1.json"), "rb") as f: - bundle.update_delegated_targets(f.read(), "role1", "targets") + trusted_set.update_delegated_targets(f.read(), "role1", "targets") with open(os.path.join(repo_dir, "role2.json"), "rb") as f: - bundle.update_delegated_targets(f.read(), "role2", "role1") + trusted_set.update_delegated_targets(f.read(), "role2", "role1") def test_out_of_order_ops(self): repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') @@ -41,38 +41,38 @@ def test_out_of_order_ops(self): with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: data[md] = f.read() - bundle = MetadataBundle(data["root"]) + trusted_set = TrustedMetadataSet(data["root"]) # Update timestamp before root is finished with self.assertRaises(RuntimeError): - bundle.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(data["timestamp"]) - bundle.root_update_finished() + trusted_set.root_update_finished() with self.assertRaises(RuntimeError): - bundle.root_update_finished() + trusted_set.root_update_finished() # Update snapshot before timestamp with self.assertRaises(RuntimeError): - bundle.update_snapshot(data["snapshot"]) + trusted_set.update_snapshot(data["snapshot"]) - bundle.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(data["timestamp"]) # Update targets before snapshot with self.assertRaises(RuntimeError): - bundle.update_targets(data["targets"]) + trusted_set.update_targets(data["targets"]) - bundle.update_snapshot(data["snapshot"]) + trusted_set.update_snapshot(data["snapshot"]) #update timestamp after snapshot with self.assertRaises(RuntimeError): - bundle.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(data["timestamp"]) # Update delegated targets before targets with self.assertRaises(RuntimeError): - bundle.update_delegated_targets(data["role1"], "role1", "targets") + trusted_set.update_delegated_targets(data["role1"], "role1", "targets") - bundle.update_targets(data["targets"]) - bundle.update_delegated_targets(data["role1"], "role1", "targets") + trusted_set.update_targets(data["targets"]) + trusted_set.update_delegated_targets(data["role1"], "role1", "targets") def test_update_with_invalid_json(self): repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') @@ -83,20 +83,20 @@ def test_update_with_invalid_json(self): # root.json not a json file at all with self.assertRaises(exceptions.RepositoryError): - MetadataBundle(b"") + TrustedMetadataSet(b"") # root.json is invalid root = Metadata.from_bytes(data["root"]) root.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): - MetadataBundle(json.dumps(root.to_dict()).encode()) + TrustedMetadataSet(json.dumps(root.to_dict()).encode()) - bundle = MetadataBundle(data["root"]) - bundle.root_update_finished() + trusted_set = TrustedMetadataSet(data["root"]) + trusted_set.root_update_finished() top_level_md = [ - (data["timestamp"], bundle.update_timestamp), - (data["snapshot"], bundle.update_snapshot), - (data["targets"], bundle.update_targets), + (data["timestamp"], trusted_set.update_timestamp), + (data["snapshot"], trusted_set.update_snapshot), + (data["targets"], trusted_set.update_targets), ] for metadata, update_func in top_level_md: # metadata is not json diff --git a/tuf/ngclient/_internal/metadata_bundle.py b/tuf/ngclient/_internal/trusted_metadata_set.py similarity index 89% rename from tuf/ngclient/_internal/metadata_bundle.py rename to tuf/ngclient/_internal/trusted_metadata_set.py index ec7322ad05..d5674d8b10 100644 --- a/tuf/ngclient/_internal/metadata_bundle.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -1,17 +1,17 @@ # Copyright the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 -"""TUF client bundle-of-metadata +"""Trusted collection of client-side TUF Metadata -MetadataBundle keeps track of current valid set of metadata for the client, +TrustedMetadataSet keeps track of current valid set of metadata for the client, and handles almost every step of the "Detailed client workflow" ( https://theupdateframework.github.io/specification/latest#detailed-client-workflow) in the TUF specification: the remaining steps are related to filesystem and network IO which is not handled here. Loaded metadata can be accessed via the index access with rolename as key -(bundle["root"]) or, in the case of top-level metadata using the helper -properties (bundle.root). +(trusted_set["root"]) or, in the case of top-level metadata using the helper +properties (trusted_set.root). The rules for top-level metadata are * Metadata is loadable only if metadata it depends on is loaded @@ -26,33 +26,33 @@ >>> # Load local root (RepositoryErrors here stop the update) >>> with open(root_path, "rb") as f: ->>> bundle = MetadataBundle(f.read()) +>>> trusted_set = TrustedMetadataSet(f.read()) >>> >>> # update root from remote until no more are available ->>> with download("root", bundle.root.signed.version + 1) as f: ->>> bundle.update_root(f.read()) +>>> with download("root", trusted_set.root.signed.version + 1) as f: +>>> trusted_set.update_root(f.read()) >>> # ... ->>> bundle.root_update_finished() +>>> trusted_set.root_update_finished() >>> >>> # load local timestamp, then update from remote >>> try: >>> with open(timestamp_path, "rb") as f: ->>> bundle.update_timestamp(f.read()) +>>> trusted_set.update_timestamp(f.read()) >>> except (RepositoryError, OSError): >>> pass # failure to load a local file is ok >>> >>> with download("timestamp") as f: ->>> bundle.update_timestamp(f.read()) +>>> trusted_set.update_timestamp(f.read()) >>> >>> # load local snapshot, then update from remote if needed >>> try: >>> with open(snapshot_path, "rb") as f: ->>> bundle.update_snapshot(f.read()) +>>> trusted_set.update_snapshot(f.read()) >>> except (RepositoryError, OSError): >>> # local snapshot is not valid, load from remote >>> # (RepositoryErrors here stop the update) >>> with download("snapshot", version) as f: ->>> bundle.update_snapshot(f.read()) +>>> trusted_set.update_snapshot(f.read()) TODO: * exceptions are not final: the idea is that client could just handle @@ -116,27 +116,27 @@ def verify_with_threshold( return len(unique_keys) >= role.threshold -class MetadataBundle(abc.Mapping): - """Internal class to keep track of valid metadata in Updater +class TrustedMetadataSet(abc.Mapping): + """Internal class to keep track of trusted metadata in Updater - MetadataBundle ensures that the collection of metadata in the bundle is - valid. It provides easy ways to update the metadata with the caller making - decisions on what is updated. + TrustedMetadataSet ensures that the collection of metadata in it is valid + and trusted through the whole client update workflow. It provides easy ways + to update the metadata with the caller making decisions on what is updated. """ def __init__(self, root_data: bytes): - """Initialize bundle by loading trusted root metadata + """Initialize TrustedMetadataSet by loading trusted root metadata Args: root_data: Trusted root metadata as bytes. Note that this metadata will only be verified by itself: it is the source of trust for - all metadata in the bundle. + all metadata in the TrustedMetadataSet Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - self._bundle = {} # type: Dict[str: Metadata] + self._trusted_set = {} # type: Dict[str: Metadata] self.reference_time = datetime.utcnow() self._root_update_finished = False @@ -147,36 +147,36 @@ def __init__(self, root_data: bytes): def __getitem__(self, role: str) -> Metadata: """Returns current Metadata for 'role'""" - return self._bundle[role] + return self._trusted_set[role] def __len__(self) -> int: - """Returns number of Metadata objects in bundle""" - return len(self._bundle) + """Returns number of Metadata objects in TrustedMetadataSet""" + return len(self._trusted_set) def __iter__(self) -> Iterator[Metadata]: - """Returns iterator over all Metadata objects in bundle""" - return iter(self._bundle) + """Returns iterator over all Metadata objects in TrustedMetadataSet""" + return iter(self._trusted_set) # Helper properties for top level metadata @property def root(self) -> Optional[Metadata]: """Current root Metadata or None""" - return self._bundle.get("root") + return self._trusted_set.get("root") @property def timestamp(self) -> Optional[Metadata]: """Current timestamp Metadata or None""" - return self._bundle.get("timestamp") + return self._trusted_set.get("timestamp") @property def snapshot(self) -> Optional[Metadata]: """Current snapshot Metadata or None""" - return self._bundle.get("snapshot") + return self._trusted_set.get("snapshot") @property def targets(self) -> Optional[Metadata]: """Current targets Metadata or None""" - return self._bundle.get("targets") + return self._trusted_set.get("targets") # Methods for updating metadata def update_root(self, data: bytes): @@ -225,7 +225,7 @@ def update_root(self, data: bytes): "New root is not signed by itself", new_root.signed ) - self._bundle["root"] = new_root + self._trusted_set["root"] = new_root logger.debug("Updated root") def root_update_finished(self): @@ -302,7 +302,7 @@ def update_timestamp(self, data: bytes): if new_timestamp.signed.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError("New timestamp is expired") - self._bundle["timestamp"] = new_timestamp + self._trusted_set["timestamp"] = new_timestamp logger.debug("Updated timestamp") # TODO: remove pylint disable once the hash verification is in metadata.py @@ -381,7 +381,7 @@ def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches if new_snapshot.signed.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError("New snapshot is expired") - self._bundle["snapshot"] = new_snapshot + self._trusted_set["snapshot"] = new_snapshot logger.debug("Updated snapshot") def update_targets(self, data: bytes): @@ -460,5 +460,5 @@ def update_delegated_targets( if new_delegate.signed.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") - self._bundle[role_name] = new_delegate + self._trusted_set[role_name] = new_delegate logger.debug("Updated %s delegated by %s", role_name, delegator_name) diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 8b2b8a2a30..a21b292869 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -15,7 +15,11 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.ngclient._internal import download, metadata_bundle, requests_fetcher +from tuf.ngclient._internal import ( + download, + requests_fetcher, + trusted_metadata_set, +) from tuf.ngclient.fetcher import FetcherInterface # Globals @@ -65,7 +69,7 @@ def __init__( # Read trusted local root metadata data = self._load_local_metadata("root") - self._bundle = metadata_bundle.MetadataBundle(data) + self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) if fetcher is None: self._fetcher = requests_fetcher.RequestsFetcher() @@ -243,7 +247,7 @@ def _load_root(self) -> None: """ # Update the root role - lower_bound = self._bundle.root.signed.version + 1 + lower_bound = self._trusted_set.root.signed.version + 1 upper_bound = lower_bound + MAX_ROOT_ROTATIONS for next_version in range(lower_bound, upper_bound): @@ -251,7 +255,7 @@ def _load_root(self) -> None: data = self._download_metadata( "root", DEFAULT_ROOT_MAX_LENGTH, next_version ) - self._bundle.update_root(data) + self._trusted_set.update_root(data) self._persist_metadata("root", data) except exceptions.FetcherHTTPError as exception: @@ -261,13 +265,13 @@ def _load_root(self) -> None: break # Verify final root - self._bundle.root_update_finished() + self._trusted_set.root_update_finished() def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: data = self._load_local_metadata("timestamp") - self._bundle.update_timestamp(data) + self._trusted_set.update_timestamp(data) except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid logger.debug("Failed to load local timestamp %s", e) @@ -276,47 +280,47 @@ def _load_timestamp(self) -> None: data = self._download_metadata( "timestamp", DEFAULT_TIMESTAMP_MAX_LENGTH ) - self._bundle.update_timestamp(data) + self._trusted_set.update_timestamp(data) self._persist_metadata("timestamp", data) def _load_snapshot(self) -> None: """Load local (and if needed remote) snapshot metadata""" try: data = self._load_local_metadata("snapshot") - self._bundle.update_snapshot(data) + self._trusted_set.update_snapshot(data) logger.debug("Local snapshot is valid: not downloading new one") except (OSError, exceptions.RepositoryError) as e: # Local snapshot does not exist or is invalid: update from remote logger.debug("Failed to load local snapshot %s", e) - metainfo = self._bundle.timestamp.signed.meta["snapshot.json"] + metainfo = self._trusted_set.timestamp.signed.meta["snapshot.json"] length = metainfo.length or DEFAULT_SNAPSHOT_MAX_LENGTH version = None - if self._bundle.root.signed.consistent_snapshot: + if self._trusted_set.root.signed.consistent_snapshot: version = metainfo.version data = self._download_metadata("snapshot", length, version) - self._bundle.update_snapshot(data) + self._trusted_set.update_snapshot(data) self._persist_metadata("snapshot", data) def _load_targets(self, role: str, parent_role: str) -> None: """Load local (and if needed remote) metadata for 'role'.""" try: data = self._load_local_metadata(role) - self._bundle.update_delegated_targets(data, role, parent_role) + self._trusted_set.update_delegated_targets(data, role, parent_role) logger.debug("Local %s is valid: not downloading new one", role) except (OSError, exceptions.RepositoryError) as e: # Local 'role' does not exist or is invalid: update from remote logger.debug("Failed to load local %s: %s", role, e) - metainfo = self._bundle.snapshot.signed.meta[f"{role}.json"] + metainfo = self._trusted_set.snapshot.signed.meta[f"{role}.json"] length = metainfo.length or DEFAULT_TARGETS_MAX_LENGTH version = None - if self._bundle.root.signed.consistent_snapshot: + if self._trusted_set.root.signed.consistent_snapshot: version = metainfo.version data = self._download_metadata(role, length, version) - self._bundle.update_delegated_targets(data, role, parent_role) + self._trusted_set.update_delegated_targets(data, role, parent_role) self._persist_metadata(role, data) def _preorder_depth_first_walk(self, target_filepath) -> Dict: @@ -348,7 +352,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # The metadata for 'role_name' must be downloaded/updated before # its targets, delegations, and child roles can be inspected. - role_metadata = self._bundle[role_name].signed + role_metadata = self._trusted_set[role_name].signed target = role_metadata.targets.get(target_filepath) # After preorder check, add current role to set of visited roles.