Skip to content

Commit

Permalink
Rename MetadataBundle to TrustedMetadataSet
Browse files Browse the repository at this point in the history
TrustedMetadataSet is a long name but
 * it better describes the main feature
 * the name isn't used in too many places

Change the variable names "bundle" -> "trusted_set"

Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
  • Loading branch information
Jussi Kukkonen committed Jun 22, 2021
1 parent 85f2219 commit ce14e93
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@

from tuf import exceptions
from tuf.api.metadata import Metadata
from tuf.ngclient._internal.metadata_bundle import MetadataBundle
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet

from tests import utils

logger = logging.getLogger(__name__)

class TestMetadataBundle(unittest.TestCase):
class TestTrustedMetadataSet(unittest.TestCase):

def test_update(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')

with open(os.path.join(repo_dir, "root.json"), "rb") as f:
bundle = MetadataBundle(f.read())
bundle.root_update_finished()
trusted_set = TrustedMetadataSet(f.read())
trusted_set.root_update_finished()

with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f:
bundle.update_timestamp(f.read())
trusted_set.update_timestamp(f.read())
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f:
bundle.update_snapshot(f.read())
trusted_set.update_snapshot(f.read())
with open(os.path.join(repo_dir, "targets.json"), "rb") as f:
bundle.update_targets(f.read())
trusted_set.update_targets(f.read())
with open(os.path.join(repo_dir, "role1.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role1", "targets")
trusted_set.update_delegated_targets(f.read(), "role1", "targets")
with open(os.path.join(repo_dir, "role2.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role2", "role1")
trusted_set.update_delegated_targets(f.read(), "role2", "role1")

def test_out_of_order_ops(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
Expand All @@ -41,38 +41,38 @@ def test_out_of_order_ops(self):
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
data[md] = f.read()

bundle = MetadataBundle(data["root"])
trusted_set = TrustedMetadataSet(data["root"])

# Update timestamp before root is finished
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])
trusted_set.update_timestamp(data["timestamp"])

bundle.root_update_finished()
trusted_set.root_update_finished()
with self.assertRaises(RuntimeError):
bundle.root_update_finished()
trusted_set.root_update_finished()

# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
bundle.update_snapshot(data["snapshot"])
trusted_set.update_snapshot(data["snapshot"])

bundle.update_timestamp(data["timestamp"])
trusted_set.update_timestamp(data["timestamp"])

# Update targets before snapshot
with self.assertRaises(RuntimeError):
bundle.update_targets(data["targets"])
trusted_set.update_targets(data["targets"])

bundle.update_snapshot(data["snapshot"])
trusted_set.update_snapshot(data["snapshot"])

#update timestamp after snapshot
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])
trusted_set.update_timestamp(data["timestamp"])

# Update delegated targets before targets
with self.assertRaises(RuntimeError):
bundle.update_delegated_targets(data["role1"], "role1", "targets")
trusted_set.update_delegated_targets(data["role1"], "role1", "targets")

bundle.update_targets(data["targets"])
bundle.update_delegated_targets(data["role1"], "role1", "targets")
trusted_set.update_targets(data["targets"])
trusted_set.update_delegated_targets(data["role1"], "role1", "targets")

def test_update_with_invalid_json(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
Expand All @@ -83,20 +83,20 @@ def test_update_with_invalid_json(self):

# root.json not a json file at all
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(b"")
TrustedMetadataSet(b"")
# root.json is invalid
root = Metadata.from_bytes(data["root"])
root.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(json.dumps(root.to_dict()).encode())
TrustedMetadataSet(json.dumps(root.to_dict()).encode())

bundle = MetadataBundle(data["root"])
bundle.root_update_finished()
trusted_set = TrustedMetadataSet(data["root"])
trusted_set.root_update_finished()

top_level_md = [
(data["timestamp"], bundle.update_timestamp),
(data["snapshot"], bundle.update_snapshot),
(data["targets"], bundle.update_targets),
(data["timestamp"], trusted_set.update_timestamp),
(data["snapshot"], trusted_set.update_snapshot),
(data["targets"], trusted_set.update_targets),
]
for metadata, update_func in top_level_md:
# metadata is not json
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Copyright the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

"""TUF client bundle-of-metadata
"""Trusted collection of client-side TUF Metadata
MetadataBundle keeps track of current valid set of metadata for the client,
TrustedMetadataSet keeps track of current valid set of metadata for the client,
and handles almost every step of the "Detailed client workflow" (
https://theupdateframework.github.io/specification/latest#detailed-client-workflow)
in the TUF specification: the remaining steps are related to filesystem and
network IO which is not handled here.
Loaded metadata can be accessed via the index access with rolename as key
(bundle["root"]) or, in the case of top-level metadata using the helper
properties (bundle.root).
(trusted_set["root"]) or, in the case of top-level metadata using the helper
properties (trusted_set.root).
The rules for top-level metadata are
* Metadata is loadable only if metadata it depends on is loaded
Expand All @@ -26,33 +26,33 @@
>>> # Load local root (RepositoryErrors here stop the update)
>>> with open(root_path, "rb") as f:
>>> bundle = MetadataBundle(f.read())
>>> trusted_set = TrustedMetadataSet(f.read())
>>>
>>> # update root from remote until no more are available
>>> with download("root", bundle.root.signed.version + 1) as f:
>>> bundle.update_root(f.read())
>>> with download("root", trusted_set.root.signed.version + 1) as f:
>>> trusted_set.update_root(f.read())
>>> # ...
>>> bundle.root_update_finished()
>>> trusted_set.root_update_finished()
>>>
>>> # load local timestamp, then update from remote
>>> try:
>>> with open(timestamp_path, "rb") as f:
>>> bundle.update_timestamp(f.read())
>>> trusted_set.update_timestamp(f.read())
>>> except (RepositoryError, OSError):
>>> pass # failure to load a local file is ok
>>>
>>> with download("timestamp") as f:
>>> bundle.update_timestamp(f.read())
>>> trusted_set.update_timestamp(f.read())
>>>
>>> # load local snapshot, then update from remote if needed
>>> try:
>>> with open(snapshot_path, "rb") as f:
>>> bundle.update_snapshot(f.read())
>>> trusted_set.update_snapshot(f.read())
>>> except (RepositoryError, OSError):
>>> # local snapshot is not valid, load from remote
>>> # (RepositoryErrors here stop the update)
>>> with download("snapshot", version) as f:
>>> bundle.update_snapshot(f.read())
>>> trusted_set.update_snapshot(f.read())
TODO:
* exceptions are not final: the idea is that client could just handle
Expand Down Expand Up @@ -116,27 +116,27 @@ def verify_with_threshold(
return len(unique_keys) >= role.threshold


class MetadataBundle(abc.Mapping):
"""Internal class to keep track of valid metadata in Updater
class TrustedMetadataSet(abc.Mapping):
"""Internal class to keep track of trusted metadata in Updater
MetadataBundle ensures that the collection of metadata in the bundle is
valid. It provides easy ways to update the metadata with the caller making
decisions on what is updated.
TrustedMetadataSet ensures that the collection of metadata in it is valid
and trusted through the whole client update workflow. It provides easy ways
to update the metadata with the caller making decisions on what is updated.
"""

def __init__(self, root_data: bytes):
"""Initialize bundle by loading trusted root metadata
"""Initialize TrustedMetadataSet by loading trusted root metadata
Args:
root_data: Trusted root metadata as bytes. Note that this metadata
will only be verified by itself: it is the source of trust for
all metadata in the bundle.
all metadata in the TrustedMetadataSet
Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.
"""
self._bundle = {} # type: Dict[str: Metadata]
self._trusted_set = {} # type: Dict[str: Metadata]
self.reference_time = datetime.utcnow()
self._root_update_finished = False

Expand All @@ -147,36 +147,36 @@ def __init__(self, root_data: bytes):

def __getitem__(self, role: str) -> Metadata:
"""Returns current Metadata for 'role'"""
return self._bundle[role]
return self._trusted_set[role]

def __len__(self) -> int:
"""Returns number of Metadata objects in bundle"""
return len(self._bundle)
"""Returns number of Metadata objects in TrustedMetadataSet"""
return len(self._trusted_set)

def __iter__(self) -> Iterator[Metadata]:
"""Returns iterator over all Metadata objects in bundle"""
return iter(self._bundle)
"""Returns iterator over all Metadata objects in TrustedMetadataSet"""
return iter(self._trusted_set)

# Helper properties for top level metadata
@property
def root(self) -> Optional[Metadata]:
"""Current root Metadata or None"""
return self._bundle.get("root")
return self._trusted_set.get("root")

@property
def timestamp(self) -> Optional[Metadata]:
"""Current timestamp Metadata or None"""
return self._bundle.get("timestamp")
return self._trusted_set.get("timestamp")

@property
def snapshot(self) -> Optional[Metadata]:
"""Current snapshot Metadata or None"""
return self._bundle.get("snapshot")
return self._trusted_set.get("snapshot")

@property
def targets(self) -> Optional[Metadata]:
"""Current targets Metadata or None"""
return self._bundle.get("targets")
return self._trusted_set.get("targets")

# Methods for updating metadata
def update_root(self, data: bytes):
Expand Down Expand Up @@ -225,7 +225,7 @@ def update_root(self, data: bytes):
"New root is not signed by itself", new_root.signed
)

self._bundle["root"] = new_root
self._trusted_set["root"] = new_root
logger.debug("Updated root")

def root_update_finished(self):
Expand Down Expand Up @@ -302,7 +302,7 @@ def update_timestamp(self, data: bytes):
if new_timestamp.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("New timestamp is expired")

self._bundle["timestamp"] = new_timestamp
self._trusted_set["timestamp"] = new_timestamp
logger.debug("Updated timestamp")

# TODO: remove pylint disable once the hash verification is in metadata.py
Expand Down Expand Up @@ -381,7 +381,7 @@ def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches
if new_snapshot.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("New snapshot is expired")

self._bundle["snapshot"] = new_snapshot
self._trusted_set["snapshot"] = new_snapshot
logger.debug("Updated snapshot")

def update_targets(self, data: bytes):
Expand Down Expand Up @@ -460,5 +460,5 @@ def update_delegated_targets(
if new_delegate.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")

self._bundle[role_name] = new_delegate
self._trusted_set[role_name] = new_delegate
logger.debug("Updated %s delegated by %s", role_name, delegator_name)
Loading

0 comments on commit ce14e93

Please sign in to comment.