Skip to content

Commit

Permalink
Create constants for top-level rolenames
Browse files Browse the repository at this point in the history
This is a change in the metadata API to remove hardcoded rolenames
and use constants instead.
Fixes theupdateframework#1648

Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
  • Loading branch information
Ivana Atanasova committed Nov 25, 2021
1 parent 600eb86 commit b992bb9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 59 deletions.
42 changes: 21 additions & 21 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,19 @@
SignedSerializer,
)

_ROOT: str = "root"
_SNAPSHOT: str = "snapshot"
_TARGETS: str = "targets"
_TIMESTAMP: str = "timestamp"

# pylint: disable=too-many-lines

logger = logging.getLogger(__name__)

# We aim to support SPECIFICATION_VERSION and require the input metadata
# files to have the same major version (the first number) as ours.
SPECIFICATION_VERSION = ["1", "0", "19"]
TOP_LEVEL_ROLE_NAMES = {"root", "timestamp", "snapshot", "targets"}
TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS}

# T is a Generic type constraint for Metadata.signed
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
Expand Down Expand Up @@ -130,13 +135,13 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
# Dispatch to contained metadata class on metadata _type field.
_type = metadata["signed"]["_type"]

if _type == "targets":
if _type == _TARGETS:
inner_cls: Type[Signed] = Targets
elif _type == "snapshot":
elif _type == _SNAPSHOT:
inner_cls = Snapshot
elif _type == "timestamp":
elif _type == _TIMESTAMP:
inner_cls = Timestamp
elif _type == "root":
elif _type == _ROOT:
inner_cls = Root
else:
raise ValueError(f'unrecognized metadata type "{_type}"')
Expand Down Expand Up @@ -394,18 +399,13 @@ class Signed(metaclass=abc.ABCMeta):
unrecognized_fields: Dictionary of all unrecognized fields.
"""

# Signed implementations are expected to override this
_signed_type: ClassVar[str] = "signed"
# type is required for static reference without changing the API
type: ClassVar[str] = "signed"

# _type and type are identical: 1st replicates file format, 2nd passes lint
@property
def _type(self) -> str:
return self._signed_type

@property
def type(self) -> str:
"""Metadata type as string."""
return self._signed_type
return self.type

# NOTE: Signed is a stupid name, because this might not be signed yet, but
# we keep it to match spec terminology (I often refer to this as "payload",
Expand Down Expand Up @@ -458,8 +458,8 @@ def _common_fields_from_dict(
"""
_type = signed_dict.pop("_type")
if _type != cls._signed_type:
raise ValueError(f"Expected type {cls._signed_type}, got {_type}")
if _type != cls.type:
raise ValueError(f"Expected type {cls.type}, got {_type}")

version = signed_dict.pop("version")
spec_version = signed_dict.pop("spec_version")
Expand Down Expand Up @@ -712,7 +712,7 @@ class Root(Signed):
unrecognized_fields: Dictionary of all unrecognized fields.
"""

_signed_type = "root"
type = _ROOT

# TODO: determine an appropriate value for max-args
# pylint: disable=too-many-arguments
Expand Down Expand Up @@ -965,7 +965,7 @@ class Timestamp(Signed):
snapshot_meta: Meta information for snapshot metadata.
"""

_signed_type = "timestamp"
type = _TIMESTAMP

def __init__(
self,
Expand Down Expand Up @@ -1015,7 +1015,7 @@ class Snapshot(Signed):
meta: A dictionary of target metadata filenames to MetaFile objects.
"""

_signed_type = "snapshot"
type = _SNAPSHOT

def __init__(
self,
Expand Down Expand Up @@ -1409,7 +1409,7 @@ class Targets(Signed):
unrecognized_fields: Dictionary of all unrecognized fields.
"""

_signed_type = "targets"
type = _TARGETS

# TODO: determine an appropriate value for max-args
# pylint: disable=too-many-arguments
Expand All @@ -1430,7 +1430,7 @@ def __init__(
def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets":
"""Creates Targets object from its dict representation."""
common_args = cls._common_fields_from_dict(signed_dict)
targets = signed_dict.pop("targets")
targets = signed_dict.pop(_TARGETS)
try:
delegations_dict = signed_dict.pop("delegations")
except KeyError:
Expand All @@ -1451,7 +1451,7 @@ def to_dict(self) -> Dict[str, Any]:
targets = {}
for target_path, target_file_obj in self.targets.items():
targets[target_path] = target_file_obj.to_dict()
targets_dict["targets"] = targets
targets_dict[_TARGETS] = targets
if self.delegations is not None:
targets_dict["delegations"] = self.delegations.to_dict()
return targets_dict
Expand Down
54 changes: 28 additions & 26 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
network IO, which are not handled here.
Loaded metadata can be accessed via index access with rolename as key
(trusted_set["root"]) or, in the case of top-level metadata, using the helper
(trusted_set[Root.type]) or, in the case of top-level metadata, using the helper
properties (trusted_set.root).
The rules that TrustedMetadataSet follows for top-level metadata are
Expand All @@ -35,7 +35,7 @@
>>> trusted_set = TrustedMetadataSet(f.read())
>>>
>>> # update root from remote until no more are available
>>> with download("root", trusted_set.root.signed.version + 1) as f:
>>> with download(Root.type, trusted_set.root.signed.version + 1) as f:
>>> trusted_set.update_root(f.read())
>>>
>>> # load local timestamp, then update from remote
Expand All @@ -45,7 +45,7 @@
>>> except (RepositoryError, OSError):
>>> pass # failure to load a local file is ok
>>>
>>> with download("timestamp") as f:
>>> with download(Timestamp.type) as f:
>>> trusted_set.update_timestamp(f.read())
>>>
>>> # load local snapshot, then update from remote if needed
Expand All @@ -55,7 +55,7 @@
>>> except (RepositoryError, OSError):
>>> # local snapshot is not valid, load from remote
>>> # (RepositoryErrors here stop the update)
>>> with download("snapshot", version) as f:
>>> with download(Snapshot.type, version) as f:
>>> trusted_set.update_snapshot(f.read())
TODO:
Expand Down Expand Up @@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]:
@property
def root(self) -> Metadata[Root]:
"""Current root Metadata"""
return self._trusted_set["root"]
return self._trusted_set[Root.type]

@property
def timestamp(self) -> Optional[Metadata[Timestamp]]:
"""Current timestamp Metadata or None"""
return self._trusted_set.get("timestamp")
return self._trusted_set.get(Timestamp.type)

@property
def snapshot(self) -> Optional[Metadata[Snapshot]]:
"""Current snapshot Metadata or None"""
return self._trusted_set.get("snapshot")
return self._trusted_set.get(Snapshot.type)

@property
def targets(self) -> Optional[Metadata[Targets]]:
"""Current targets Metadata or None"""
return self._trusted_set.get("targets")
return self._trusted_set.get(Targets.type)

# Methods for updating metadata
def update_root(self, data: bytes) -> Metadata[Root]:
Expand Down Expand Up @@ -166,23 +166,25 @@ def update_root(self, data: bytes) -> Metadata[Root]:
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load root") from e

if new_root.signed.type != "root":
if new_root.signed.type != Root.type:
raise exceptions.RepositoryError(
f"Expected 'root', got '{new_root.signed.type}'"
)

# Verify that new root is signed by trusted root
self.root.verify_delegate("root", new_root)
self.root.verify_delegate(Root.type, new_root)

if new_root.signed.version != self.root.signed.version + 1:
raise exceptions.ReplayedMetadataError(
"root", new_root.signed.version, self.root.signed.version
Root.type,
new_root.signed.version,
self.root.signed.version,
)

# Verify that new root is signed by itself
new_root.verify_delegate("root", new_root)
new_root.verify_delegate(Root.type, new_root)

self._trusted_set["root"] = new_root
self._trusted_set[Root.type] = new_root
logger.info("Updated root v%d", new_root.signed.version)

return new_root
Expand Down Expand Up @@ -222,20 +224,20 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load timestamp") from e

if new_timestamp.signed.type != "timestamp":
if new_timestamp.signed.type != Timestamp.type:
raise exceptions.RepositoryError(
f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
)

self.root.verify_delegate("timestamp", new_timestamp)
self.root.verify_delegate(Timestamp.type, new_timestamp)

# If an existing trusted timestamp is updated,
# check for a rollback attack
if self.timestamp is not None:
# Prevent rolling back timestamp version
if new_timestamp.signed.version < self.timestamp.signed.version:
raise exceptions.ReplayedMetadataError(
"timestamp",
Timestamp.type,
new_timestamp.signed.version,
self.timestamp.signed.version,
)
Expand All @@ -245,15 +247,15 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
< self.timestamp.signed.snapshot_meta.version
):
raise exceptions.ReplayedMetadataError(
"snapshot",
Snapshot.type,
new_timestamp.signed.snapshot_meta.version,
self.timestamp.signed.snapshot_meta.version,
)

# expiry not checked to allow old timestamp to be used for rollback
# protection of new timestamp: expiry is checked in update_snapshot()

self._trusted_set["timestamp"] = new_timestamp
self._trusted_set[Timestamp.type] = new_timestamp
logger.info("Updated timestamp v%d", new_timestamp.signed.version)

# timestamp is loaded: raise if it is not valid _final_ timestamp
Expand Down Expand Up @@ -323,12 +325,12 @@ def update_snapshot(
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load snapshot") from e

if new_snapshot.signed.type != "snapshot":
if new_snapshot.signed.type != Snapshot.type:
raise exceptions.RepositoryError(
f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
)

self.root.verify_delegate("snapshot", new_snapshot)
self.root.verify_delegate(Snapshot.type, new_snapshot)

# version not checked against meta version to allow old snapshot to be
# used in rollback protection: it is checked when targets is updated
Expand All @@ -354,7 +356,7 @@ def update_snapshot(
# expiry not checked to allow old snapshot to be used for rollback
# protection of new snapshot: it is checked when targets is updated

self._trusted_set["snapshot"] = new_snapshot
self._trusted_set[Snapshot.type] = new_snapshot
logger.info("Updated snapshot v%d", new_snapshot.signed.version)

# snapshot is loaded, but we raise if it's not valid _final_ snapshot
Expand Down Expand Up @@ -389,7 +391,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]:
Returns:
Deserialized and verified targets Metadata object
"""
return self.update_delegated_targets(data, "targets", "root")
return self.update_delegated_targets(data, Targets.type, Root.type)

def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str
Expand Down Expand Up @@ -440,7 +442,7 @@ def update_delegated_targets(
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load snapshot") from e

if new_delegate.signed.type != "targets":
if new_delegate.signed.type != Targets.type:
raise exceptions.RepositoryError(
f"Expected 'targets', got '{new_delegate.signed.type}'"
)
Expand Down Expand Up @@ -472,12 +474,12 @@ def _load_trusted_root(self, data: bytes) -> None:
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load root") from e

if new_root.signed.type != "root":
if new_root.signed.type != Root.type:
raise exceptions.RepositoryError(
f"Expected 'root', got '{new_root.signed.type}'"
)

new_root.verify_delegate("root", new_root)
new_root.verify_delegate(Root.type, new_root)

self._trusted_set["root"] = new_root
self._trusted_set[Root.type] = new_root
logger.info("Loaded trusted root v%d", new_root.signed.version)
Loading

0 comments on commit b992bb9

Please sign in to comment.