Skip to content

Commit

Permalink
Merge pull request #1604 from jku/ngclient-api-polish
Browse files Browse the repository at this point in the history
Ngclient api polish
  • Loading branch information
Jussi Kukkonen authored Oct 27, 2021
2 parents 1d115b5 + 6aaa1ea commit 7b8ff22
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 198 deletions.
184 changes: 74 additions & 110 deletions tests/test_updater_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tuf.unittest_toolbox as unittest_toolbox

from tests import utils
from tuf.api.metadata import Metadata
from tuf.api.metadata import Metadata, TargetFile
from tuf import exceptions, ngclient
from securesystemslib.signer import SSlibSigner
from securesystemslib.interface import import_rsa_privatekey_from_file
Expand Down Expand Up @@ -111,11 +111,14 @@ def setUp(self):

self.metadata_url = f"{url_prefix}/metadata/"
self.targets_url = f"{url_prefix}/targets/"
self.destination_directory = self.make_temp_directory()
self.dl_dir = self.make_temp_directory()
# Creating a repository instance. The test cases will use this client
# updater to refresh metadata, fetch target files, etc.
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.targets_url
self.updater = ngclient.Updater(
repository_dir=self.client_directory,
metadata_base_url=self.metadata_url,
target_dir=self.dl_dir,
target_base_url=self.targets_url
)

def tearDown(self):
Expand Down Expand Up @@ -187,53 +190,40 @@ def consistent_snapshot_modifier(root):
self._modify_repository_root(
consistent_snapshot_modifier, bump_version=True
)
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.targets_url
updater = ngclient.Updater(
self.client_directory, self.metadata_url, self.dl_dir, self.targets_url
)

# All metadata is in local directory already
self.repository_updater.refresh()
updater.refresh()
# Make sure that consistent snapshot is enabled
self.assertTrue(
self.repository_updater._trusted_set.root.signed.consistent_snapshot
)
# Get targetinfo for "file1.txt" listed in targets
targetinfo1 = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
# Get targetinfo for "file3.txt" listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
updater._trusted_set.root.signed.consistent_snapshot
)

# Get targetinfos, assert cache does not contain the files
info1 = updater.get_targetinfo("file1.txt")
info3 = updater.get_targetinfo("file3.txt")
self.assertIsNone(updater.find_cached_target(info1))
self.assertIsNone(updater.find_cached_target(info3))

# Create consistent targets with file path HASH.FILENAME.EXT
target1_hash = list(targetinfo1.hashes.values())[0]
target3_hash = list(targetinfo3.hashes.values())[0]
target1_hash = list(info1.hashes.values())[0]
target3_hash = list(info3.hashes.values())[0]
self._create_consistent_target("file1.txt", target1_hash)
self._create_consistent_target("file3.txt", target3_hash)

updated_targets = self.repository_updater.updated_targets(
[targetinfo1, targetinfo3], self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo1, targetinfo3])
self.repository_updater.download_target(
targetinfo1, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo3])

self.repository_updater.download_target(
targetinfo3, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)
# Download files, assert that cache has correct files
updater.download_target(info1)
path = updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
self.assertIsNone(updater.find_cached_target(info3))

self.assertListEqual(updated_targets, [])
updater.download_target(info3)
path = updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
path = updater.find_cached_target(info3)
self.assertEqual(path, os.path.join(self.dl_dir, info3.path))

def test_refresh_and_download(self):
# Test refresh without consistent targets - targets without hash prefixes.
Expand All @@ -244,50 +234,33 @@ def test_refresh_and_download(self):
os.remove(os.path.join(self.client_directory, "1.root.json"))

# top-level metadata is in local directory already
self.repository_updater.refresh()
self.updater.refresh()
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file1.txt' listed in targets
targetinfo1 = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
# Get targetinfos, assert that cache does not contain files
info1 = self.updater.get_targetinfo("file1.txt")
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file3.txt' listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
)
info3 = self.updater.get_targetinfo("file3.txt")
expected_files = ["role1", "root", "snapshot", "targets", "timestamp"]
self._assert_files(expected_files)
self.assertIsNone(self.updater.find_cached_target(info1))
self.assertIsNone(self.updater.find_cached_target(info3))

updated_targets = self.repository_updater.updated_targets(
[targetinfo1, targetinfo3], self.destination_directory
)

self.assertListEqual(updated_targets, [targetinfo1, targetinfo3])

self.repository_updater.download_target(
targetinfo1, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
# Download files, assert that cache has correct files
self.updater.download_target(info1)
path = self.updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
self.assertIsNone(
self.updater.find_cached_target(info3)
)

self.assertListEqual(updated_targets, [targetinfo3])

# Check that duplicates are excluded
updated_targets = self.repository_updater.updated_targets(
[targetinfo3, targetinfo3], self.destination_directory
)
self.assertListEqual(updated_targets, [targetinfo3])

self.repository_updater.download_target(
targetinfo3, self.destination_directory
)
updated_targets = self.repository_updater.updated_targets(
updated_targets, self.destination_directory
)

self.assertListEqual(updated_targets, [])
self.updater.download_target(info3)
path = self.updater.find_cached_target(info1)
self.assertEqual(path, os.path.join(self.dl_dir, info1.path))
path = self.updater.find_cached_target(info3)
self.assertEqual(path, os.path.join(self.dl_dir, info3.path))

def test_refresh_with_only_local_root(self):
os.remove(os.path.join(self.client_directory, "timestamp.json"))
Expand All @@ -298,70 +271,61 @@ def test_refresh_with_only_local_root(self):
os.remove(os.path.join(self.client_directory, "1.root.json"))
self._assert_files(["root"])

self.repository_updater.refresh()
self.updater.refresh()
self._assert_files(["root", "snapshot", "targets", "timestamp"])

# Get targetinfo for 'file3.txt' listed in the delegated role1
targetinfo3 = self.repository_updater.get_one_valid_targetinfo(
"file3.txt"
)
targetinfo3 = self.updater.get_targetinfo("file3.txt")
expected_files = ["role1", "root", "snapshot", "targets", "timestamp"]
self._assert_files(expected_files)

def test_both_target_urls_not_set(self):
# target_base_url = None and Updater._target_base_url = None
self.repository_updater = ngclient.Updater(
self.client_directory, self.metadata_url
)
updater = ngclient.Updater(self.client_directory, self.metadata_url, self.dl_dir)
info = TargetFile(1, {"sha256": ""}, "targetpath")
with self.assertRaises(ValueError):
self.repository_updater.download_target(
[], self.destination_directory
)
updater.download_target(info)

def test_no_target_dir_no_filepath(self):
# filepath = None and Updater.target_dir = None
updater = ngclient.Updater(self.client_directory, self.metadata_url)
info = TargetFile(1, {"sha256": ""}, "targetpath")
with self.assertRaises(ValueError):
updater.find_cached_target(info)
with self.assertRaises(ValueError):
updater.download_target(info)

def test_external_targets_url(self):
self.repository_updater.refresh()
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
self.repository_updater.download_target(
targetinfo, self.destination_directory, self.targets_url
)
self.updater.refresh()
info = self.updater.get_targetinfo("file1.txt")

self.updater.download_target(info, target_base_url=self.targets_url)

def test_length_hash_mismatch(self):
self.repository_updater.refresh()
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file1.txt"
)
self.updater.refresh()
targetinfo = self.updater.get_targetinfo("file1.txt")

length = targetinfo.length
with self.assertRaises(exceptions.RepositoryError):
targetinfo.length = 44
self.repository_updater.download_target(
targetinfo, self.destination_directory
)
self.updater.download_target(targetinfo)

with self.assertRaises(exceptions.RepositoryError):
targetinfo.length = length
targetinfo.hashes = {"sha256": "abcd"}
self.repository_updater.download_target(
targetinfo, self.destination_directory
)
self.updater.download_target(targetinfo)

def test_updating_root(self):
# Bump root version, resign and refresh
self._modify_repository_root(lambda root: None, bump_version=True)
self.repository_updater.refresh()
self.assertEqual(
self.repository_updater._trusted_set.root.signed.version, 2
)
self.updater.refresh()
self.assertEqual(self.updater._trusted_set.root.signed.version, 2)

def test_missing_targetinfo(self):
self.repository_updater.refresh()
self.updater.refresh()

# Get targetinfo for non-existing file
targetinfo = self.repository_updater.get_one_valid_targetinfo(
"file33.txt"
)
self.assertIsNone(targetinfo)
self.assertIsNone(self.updater.get_targetinfo("file33.txt"))


if __name__ == "__main__":
Expand Down
46 changes: 23 additions & 23 deletions tests/test_updater_with_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _run_refresh(self) -> Updater:
updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim
)
Expand Down Expand Up @@ -90,9 +91,13 @@ def test_refresh(self):
@utils.run_sub_tests_with_dataset(targets)
def test_targets(self, test_case_data: Tuple[str, bytes, str]):
targetpath, content, encoded_path = test_case_data
# target does not exist yet
path = os.path.join(self.targets_dir, encoded_path)

updater = self._run_refresh()
self.assertIsNone(updater.get_one_valid_targetinfo(targetpath))
# target does not exist yet, configuration is what we expect
self.assertIsNone(updater.get_targetinfo(targetpath))
self.assertTrue(self.sim.root.consistent_snapshot)
self.assertTrue(updater.config.prefix_targets_with_hash)

# Add targets to repository
self.sim.targets.version += 1
Expand All @@ -101,30 +106,25 @@ def test_targets(self, test_case_data: Tuple[str, bytes, str]):

updater = self._run_refresh()
# target now exists, is not in cache yet
file_info = updater.get_one_valid_targetinfo(targetpath)
self.assertIsNotNone(file_info)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir),
[file_info]
)
info = updater.get_targetinfo(targetpath)
self.assertIsNotNone(info)
# Test without and with explicit local filepath
self.assertIsNone(updater.find_cached_target(info))
self.assertIsNone(updater.find_cached_target(info, path))

# Assert consistent_snapshot is True and downloaded targets have prefix.
self.assertTrue(self.sim.root.consistent_snapshot)
self.assertTrue(updater.config.prefix_targets_with_hash)
# download target, assert it is in cache and content is correct
local_path = updater.download_target(file_info, self.targets_dir)
self.assertEqual(
updater.updated_targets([file_info], self.targets_dir), []
)
self.assertTrue(local_path.startswith(self.targets_dir))
with open(local_path, "rb") as f:
self.assertEqual(f.read(), content)

# Assert that the targetpath was URL encoded as expected.
encoded_absolute_path = os.path.join(self.targets_dir, encoded_path)
self.assertEqual(local_path, encoded_absolute_path)
self.assertEqual(path, updater.download_target(info))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))

with open(path, "rb") as f:
self.assertEqual(f.read(), content)

# download using explicit filepath as well
os.remove(path)
self.assertEqual(path, updater.download_target(info, path))
self.assertEqual(path, updater.find_cached_target(info))
self.assertEqual(path, updater.find_cached_target(info, path))

def test_fishy_rolenames(self):
roles_to_filenames = {
Expand All @@ -145,7 +145,7 @@ def test_fishy_rolenames(self):
updater = self._run_refresh()

# trigger updater to fetch the delegated metadata, check filenames
updater.get_one_valid_targetinfo("anything")
updater.get_targetinfo("anything")
local_metadata = os.listdir(self.metadata_dir)
for fname in roles_to_filenames.values():
self.assertTrue(fname in local_metadata)
Expand Down
Loading

0 comments on commit 7b8ff22

Please sign in to comment.