Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Add target support to RepositorySimulator #1587

Merged
merged 2 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,51 @@
as a way to "download" new metadata from remote: in practice no downloading,
network connections or even file access happens as RepositorySimulator serves
everything from memory.

Metadata and targets "hosted" by the simulator are made available in URL paths
"/metadata/..." and "/targets/..." respectively.

Example::

# constructor creates repository with top-level metadata
sim = RepositorySimulator()

# metadata can be modified directly: it is immediately available to clients
sim.snapshot.version += 1

# As an exception, new root versions require explicit publishing
sim.root.version += 1
sim.publish_root()

# there are helper functions
sim.add_target("targets", b"content", "targetpath")
sim.targets.version += 1
sim.update_snapshot()

# Use the simulated repository from an Updater:
updater = Updater(
dir,
"https://example.com/metadata/",
"https://example.com/targets/",
sim
)
updater.refresh()
"""

from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import os
import tempfile
from securesystemslib.hash import digest
from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibSigner
from typing import Dict, Iterator, List, Optional, Tuple
from urllib import parse

from tuf.api.serialization.json import JSONSerializer
from tuf.exceptions import FetcherHTTPError
from tuf.exceptions import FetcherHTTPError, RepositoryError
from tuf.api.metadata import (
Key,
Metadata,
Expand All @@ -35,6 +66,7 @@
Root,
SPECIFICATION_VERSION,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
Expand All @@ -44,6 +76,11 @@

SPEC_VER = ".".join(SPECIFICATION_VERSION)

@dataclass
class RepositoryTarget:
"""Contains actual target data and the related target metadata"""
data: bytes
target_file: TargetFile

class RepositorySimulator(FetcherInterface):
def __init__(self):
Expand All @@ -60,6 +97,9 @@ def __init__(self):
# signers are used on-demand at fetch time to sign metadata
self.signers: Dict[str, List[SSlibSigner]] = {}

# target downloads are served from this dict
self.target_files: Dict[str, RepositoryTarget] = {}

self.dump_dir = None
self.dump_version = 0

Expand Down Expand Up @@ -126,6 +166,9 @@ def publish_root(self):
logger.debug("Published root v%d", self.root.version)

def fetch(self, url: str) -> Iterator[bytes]:
if not self.root.consistent_snapshot:
raise NotImplementedError("non-consistent snapshot not supported")

spliturl = parse.urlparse(url)
if spliturl.path.startswith("/metadata/"):
parts = spliturl.path[len("/metadata/") :].split(".")
Expand All @@ -136,10 +179,36 @@ def fetch(self, url: str) -> Iterator[bytes]:
version = None
role = parts[0]
yield self._fetch_metadata(role, version)
elif spliturl.path.startswith("/targets/"):
# figure out target path and hash prefix
path = spliturl.path[len("/targets/") :]
dir_parts, sep , prefixed_filename = path.rpartition("/")
prefix, _, filename = prefixed_filename.partition(".")
target_path = f"{dir_parts}{sep}{filename}"

yield self._fetch_target(target_path, prefix)
else:
raise FetcherHTTPError(f"Unknown path '{spliturl.path}'", 404)

def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes:
"""Return data for 'target_path', checking 'hash' if it is given.

If hash is None, then consistent_snapshot is not used
"""
repo_target = self.target_files.get(target_path)
if repo_target is None:
raise FetcherHTTPError(f"No target {target_path}", 404)
if hash and hash not in repo_target.target_file.hashes.values():
raise FetcherHTTPError(f"hash mismatch for {target_path}", 404)

logger.debug("fetched target %s", target_path)
return repo_target.data

def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes:
"""Return signed metadata for 'role', using 'version' if it is given.

If version is None, non-versioned metadata is being requested
"""
if role == "root":
# return a version previously serialized in publish_root()
if version is None or version > len(self.signed_roots):
Expand Down Expand Up @@ -187,6 +256,16 @@ def update_snapshot(self):
self.snapshot.version += 1
self.update_timestamp()

def add_target(self, role: str, data: bytes, path: str):
if role == "targets":
targets = self.targets
else:
targets = self.md_delegates[role].signed

target = TargetFile.from_data(path, data, ["sha256"])
targets.targets[path] = target
self.target_files[path] = RepositoryTarget(data, target)

def write(self):
"""Dump current repository metadata to self.dump_dir

Expand Down
48 changes: 41 additions & 7 deletions tests/test_updater_with_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ class TestUpdater(unittest.TestCase):
dump_dir:Optional[str] = None

def setUp(self):
self.client_dir = tempfile.TemporaryDirectory()
self.temp_dir = tempfile.TemporaryDirectory()
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)

# Setup the repository, bootstrap client root.json
self.sim = RepositorySimulator()
with open(os.path.join(self.client_dir.name, "root.json"), "bw") as f:
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000)
f.write(root)

Expand All @@ -38,17 +42,21 @@ def setUp(self):
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)

def _run_refresh(self):
def tearDown(self):
self.temp_dir.cleanup()

def _run_refresh(self) -> Updater:
if self.sim.dump_dir is not None:
self.sim.write()

updater = Updater(
self.client_dir.name,
self.metadata_dir,
"https://example.com/metadata/",
"https://example.com/targets/",
self.sim
)
updater.refresh()
return updater

def test_refresh(self):
# Update top level metadata
Expand All @@ -71,6 +79,35 @@ def test_refresh(self):

self._run_refresh()

def test_targets(self):
# target does not exist yet
updater = self._run_refresh()
self.assertIsNone(updater.get_one_valid_targetinfo("file"))

self.sim.targets.version += 1
self.sim.add_target("targets", b"content", "file")
self.sim.update_snapshot()

# target now exists, is not in cache yet
updater = self._run_refresh()
file_info = updater.get_one_valid_targetinfo("file")
self.assertIsNotNone(file_info)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir), [file_info]
)

# download target, assert it is in cache and content is correct
updater.download_target(file_info, self.targets_dir)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir), []
)
with open(os.path.join(self.targets_dir, "file"), "rb") as f:
self.assertEqual(f.read(), b"content")

# TODO: run the same download tests for
# self.sim.add_target("targets", b"more content", "dir/file2")
# This currently fails because issue #1576

def test_keys_and_signatures(self):
"""Example of the two trickiest test areas: keys and root updates"""

Expand Down Expand Up @@ -110,9 +147,6 @@ def test_keys_and_signatures(self):

self._run_refresh()

def tearDown(self):
self.client_dir.cleanup()

if __name__ == "__main__":
if "--dump" in sys.argv:
TestUpdater.dump_dir = tempfile.mkdtemp()
Expand Down