diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 5f6732aad0..028307e918 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -27,6 +27,12 @@ logger = logging.getLogger(__name__) +# test matrix for all tests that should behave similarly regardless of respect_expiry +respect_expiry_matrix: utils.DataSet = { + "respect_expiry=True": True, + "respect_expiry=False": False, +} + # pylint: disable=too-many-public-methods class TestTrustedMetadataSet(unittest.TestCase): @@ -93,7 +99,12 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) + self.trusted_set: TrustedMetadataSet + + def _init(self, respect_expiry: bool) -> None: + self.trusted_set = TrustedMetadataSet( + self.metadata[Root.type], respect_expiry + ) def _update_all_besides_targets( self, @@ -117,7 +128,9 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata[Snapshot.type] self.trusted_set.update_snapshot(snapshot_bytes) - def test_update(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update(self, respect_expiry: bool) -> None: + self._init(respect_expiry) self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) self.trusted_set.update_targets(self.metadata[Targets.type]) @@ -137,7 +150,9 @@ def test_update(self) -> None: self.assertTrue(count, 6) - def test_update_metadata_output(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_metadata_output(self, respect_expiry: bool) -> None: + self._init(respect_expiry) timestamp = self.trusted_set.update_timestamp( self.metadata["timestamp"] ) @@ -155,7 +170,9 @@ def test_update_metadata_output(self) -> None: self.assertIsInstance(delegeted_targets_1.signed, Targets) self.assertIsInstance(delegeted_targets_2.signed, Targets) - def test_out_of_order_ops(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_out_of_order_ops(self, respect_expiry: bool) -> None: + self._init(respect_expiry) # Update snapshot before timestamp with self.assertRaises(RuntimeError): self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) @@ -192,24 +209,42 @@ def test_out_of_order_ops(self) -> None: self.metadata["role1"], "role1", Targets.type ) - def test_root_with_invalid_json(self) -> None: - # Test loading initial root and root update - for test_func in [TrustedMetadataSet, self.trusted_set.update_root]: - # root is not json - with self.assertRaises(exceptions.RepositoryError): - test_func(b"") + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_initial_root_with_invalid_json(self, respect_expiry: bool) -> None: + # root is not json + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet(b"", respect_expiry) - # root is invalid - root = Metadata.from_bytes(self.metadata[Root.type]) - root.signed.version += 1 - with self.assertRaises(exceptions.UnsignedMetadataError): - test_func(root.to_bytes()) + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + TrustedMetadataSet(root.to_bytes(), respect_expiry) - # metadata is of wrong type - with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata[Snapshot.type]) + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet(self.metadata[Snapshot.type], respect_expiry) + + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_root_with_invalid_json(self, respect_expiry: bool) -> None: + self._init(respect_expiry) + # root is not json + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(b"") + + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_root(root.to_bytes()) + + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(self.metadata[Snapshot.type]) - def test_top_level_md_with_invalid_json(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_top_level_md_with_invalid_json(self, respect_expiry: bool) -> None: + self._init(respect_expiry) top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [ (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), @@ -232,7 +267,10 @@ def test_top_level_md_with_invalid_json(self) -> None: update_func(metadata) - def test_update_root_new_root(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_root_new_root(self, respect_expiry: bool) -> None: + self._init(respect_expiry) + # test that root can be updated with a new valid version def root_new_version_modifier(root: Root) -> None: root.version += 1 @@ -240,7 +278,12 @@ def root_new_version_modifier(root: Root) -> None: root = self.modify_metadata(Root.type, root_new_version_modifier) self.trusted_set.update_root(root) - def test_update_root_new_root_fail_threshold_verification(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_root_new_root_fail_threshold_verification( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + # Increase threshold in new root, do not add enough keys def root_threshold_bump(root: Root) -> None: root.version += 1 @@ -250,7 +293,11 @@ def root_threshold_bump(root: Root) -> None: with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_root(root) - def test_update_root_new_root_ver_same_as_trusted_root_ver(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_root_new_root_ver_same_as_trusted_root_ver( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_root(self.metadata[Root.type]) @@ -260,12 +307,22 @@ def root_expired_modifier(root: Root) -> None: # intermediate root can be expired root = self.modify_metadata(Root.type, root_expired_modifier) - tmp_trusted_set = TrustedMetadataSet(root) + tmp_trusted_set = TrustedMetadataSet(root, True) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) - def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self) -> None: + # If we decide to not respect expiry, it all works: + root = self.modify_metadata(Root.type, root_expired_modifier) + tmp_trusted_set = TrustedMetadataSet(root, False) + tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) + + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_timestamp_new_timestamp_ver_below_trusted_ver( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 @@ -275,7 +332,11 @@ def version_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) - def test_update_timestamp_with_same_timestamp(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_timestamp_with_same_timestamp( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) # Test that timestamp is NOT updated if: # new_timestamp.version == trusted_timestamp.version self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) @@ -289,7 +350,12 @@ def test_update_timestamp_with_same_timestamp(self) -> None: # was not updated. self.assertEqual(id(initial_timestamp), id(self.trusted_set.timestamp)) - def test_update_timestamp_snapshot_ver_below_current(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_timestamp_snapshot_ver_below_current( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def bump_snapshot_version(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 # The timestamp version must be increased to initiate a update. @@ -304,9 +370,12 @@ def bump_snapshot_version(timestamp: Timestamp) -> None: self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def test_update_timestamp_expired(self) -> None: + self._init(respect_expiry=True) + # new_timestamp has expired def timestamp_expired_modifier(timestamp: Timestamp) -> None: timestamp.expires = datetime(1970, 1, 1) + timestamp.version = 2 # expired intermediate timestamp is loaded but raises timestamp = self.modify_metadata( @@ -319,7 +388,17 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - def test_update_snapshot_length_or_hash_mismatch(self) -> None: + # If we decide to not respect expiry, it all works: + self._init(respect_expiry=False) + self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_snapshot_length_or_hash_mismatch( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def modify_snapshot_length(timestamp: Timestamp) -> None: timestamp.snapshot_meta.length = 1 @@ -330,16 +409,23 @@ def modify_snapshot_length(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - def test_update_snapshot_fail_threshold_verification(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_snapshot_fail_threshold_verification( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) def test_update_snapshot_version_diverge_timestamp_snapshot_version( - self, + self, respect_expiry: bool ) -> None: + self._init(respect_expiry) + def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 @@ -356,7 +442,11 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata[Targets.type]) - def test_update_snapshot_file_removed_from_meta(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_snapshot_file_removed_from_meta( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) self._update_all_besides_targets(self.metadata[Timestamp.type]) def remove_file_from_meta(snapshot: Snapshot) -> None: @@ -367,7 +457,11 @@ def remove_file_from_meta(snapshot: Snapshot) -> None: with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) - def test_update_snapshot_meta_version_decreases(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_snapshot_meta_version_decreases( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def version_meta_modifier(snapshot: Snapshot) -> None: @@ -380,6 +474,7 @@ def version_meta_modifier(snapshot: Snapshot) -> None: self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) def test_update_snapshot_expired_new_snapshot(self) -> None: + self._init(respect_expiry=True) self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: @@ -396,7 +491,18 @@ def snapshot_expired_modifier(snapshot: Snapshot) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(self.metadata[Targets.type]) - def test_update_snapshot_successful_rollback_checks(self) -> None: + # If we decide to not respect expiry, it all works: + self._init(respect_expiry=False) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_targets(self.metadata[Targets.type]) + + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_snapshot_successful_rollback_checks( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def meta_version_bump(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version += 1 # The timestamp version must be increased to initiate a update. @@ -420,7 +526,12 @@ def version_bump(snapshot: Snapshot) -> None: # update targets to trigger final snapshot meta version check self.trusted_set.update_targets(self.metadata[Targets.type]) - def test_update_targets_no_meta_in_snapshot(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_targets_no_meta_in_snapshot( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} @@ -432,7 +543,12 @@ def no_meta_modifier(snapshot: Snapshot) -> None: with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata[Targets.type]) - def test_update_targets_hash_diverge_from_snapshot_meta_hash(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_targets_hash_diverge_from_snapshot_meta_hash( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) @@ -445,7 +561,12 @@ def meta_length_modifier(snapshot: Snapshot) -> None: with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata[Targets.type]) - def test_update_targets_version_diverge_snapshot_meta_version(self) -> None: + @utils.run_sub_tests_with_dataset(respect_expiry_matrix) + def test_update_targets_version_diverge_snapshot_meta_version( + self, respect_expiry: bool + ) -> None: + self._init(respect_expiry) + def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) @@ -459,6 +580,7 @@ def meta_modifier(snapshot: Snapshot) -> None: self.trusted_set.update_targets(self.metadata[Targets.type]) def test_update_targets_expired_new_target(self) -> None: + self._init(respect_expiry=True) self._update_all_besides_targets() # new_delegated_target has expired @@ -469,6 +591,11 @@ def target_expired_modifier(target: Targets) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_targets(targets) + # If we decide to not respect expiry, it all works: + self._init(respect_expiry=False) + self._update_all_besides_targets() + self.trusted_set.update_targets(targets) + # TODO test updating over initial metadata (new keys, newer timestamp, etc) diff --git a/tests/test_updater_offline.py b/tests/test_updater_offline.py new file mode 100644 index 0000000000..1505fde883 --- /dev/null +++ b/tests/test_updater_offline.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + + +"""Test ngclient Updater offline mode""" + +import datetime +import os +import sys +import tempfile +import unittest +from typing import Optional +from unittest.mock import Mock, patch + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api.exceptions import DownloadError, ExpiredMetadataError +from tuf.api.metadata import SPECIFICATION_VERSION, DelegatedRole, Targets +from tuf.ngclient import Updater, UpdaterConfig + + +class TestOffline(unittest.TestCase): + """Test Updater in offline mode""" + + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + + def setUp(self) -> None: + # pylint: disable=consider-using-with + self.temp_dir = tempfile.TemporaryDirectory() + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + self.sim = RepositorySimulator() + + # Add a delegated role and two targets to repository + self.sim.targets.version += 1 + spec_version = ".".join(SPECIFICATION_VERSION) + targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) + role = DelegatedRole("delegated", [], 1, False, ["delegated/*"], None) + self.sim.add_delegation("targets", role, targets) + self.sim.add_target("targets", b"hello world", "file") + self.sim.add_target("delegated", b"content", "delegated/file2") + self.sim.update_snapshot() + + # boostrap client with initial root metadata + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(self.sim.signed_roots[0]) + + if self.dump_dir is not None: + # create test specific dump directory + name = self.id().split(".")[-1] + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def _run_refresh(self) -> Updater: + """Create a new Updater instance and refresh""" + if self.dump_dir is not None: + self.sim.write() + + updater = Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) + updater.refresh() + return updater + + def _run_offline_refresh(self) -> Updater: + """Create a new Updater instance and refresh""" + if self.dump_dir is not None: + self.sim.write() + + updater = Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + UpdaterConfig(offline=True), + ) + updater.refresh() + return updater + + @patch.object(datetime, "datetime", wraps=datetime.datetime) + def test_refresh(self, mock_time: Mock) -> None: + """Test metadata refresh refresh()in offline mode""" + # Run a "online" updater refresh to get toplevel metadata in local cache + self._run_refresh() + + self.sim.fetch_tracker.metadata.clear() + + # Refresh works in Offline mode (at this point metadata is not expired) + self._run_offline_refresh() + # Expect no download attempts + self.assertListEqual(self.sim.fetch_tracker.metadata, []) + + # Move current time a year into the future: all metadata is now expired + mock_time.utcnow.return_value = ( + datetime.datetime.utcnow() + datetime.timedelta(weeks=52) + ) + + # Refresh in default online mode fails when metadata has expired + with self.assertRaises(ExpiredMetadataError): + self._run_refresh() + + self.sim.fetch_tracker.metadata.clear() + + # Refresh in offline mode succeeds when local metadata has expired + self._run_offline_refresh() + # Expect no download attempts + self.assertListEqual(self.sim.fetch_tracker.metadata, []) + + def test_refresh_with_missing_top_level_metadata(self) -> None: + """Test metadata refresh in offline mode when cache does not contain all top level metadata""" + # Run a "online" updater refresh to get toplevel metadata in local cache + self._run_refresh() + + self.sim.fetch_tracker.metadata.clear() + + for role in ["targets", "snapshot", "timestamp"]: + fname = os.path.join(self.metadata_dir, f"{role}.json") + os.remove(fname) + + # Refresh in offline mode fails since top level metadata is not in cache + with self.assertRaises(DownloadError): + self._run_offline_refresh() + # Expect no download attempts + self.assertListEqual(self.sim.fetch_tracker.metadata, []) + + def test_download(self) -> None: + """Test download in offline mode""" + + # Run a "online" updater refresh to get toplevel metadata in local cache + self._run_refresh() + + self.sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.targets.clear() + + # Downloading a target file while in offline mode fails + updater = self._run_offline_refresh() + info = updater.get_targetinfo("file") + assert info + with self.assertRaises(DownloadError): + updater.download_target(info) + + # Expect no download attempts + self.assertListEqual(self.sim.fetch_tracker.metadata, []) + self.assertListEqual(self.sim.fetch_tracker.targets, []) + + def test_find_cached_target(self) -> None: + """Test find_cached_target() in offline mode""" + + # Run a "online" refresh to get metadata in local cache + updater = self._run_refresh() + + # offline find_cached_target() returns None because target is not cached + updater = self._run_offline_refresh() + info = updater.get_targetinfo("file") + assert info + self.assertIsNone(updater.find_cached_target(info)) + + # Run a "online" download to get target in local cache + updater = self._run_refresh() + info = updater.get_targetinfo("file") + assert info + updater.download_target(info) + + self.sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.targets.clear() + + # offline find_cached_target() succeeds now + updater = self._run_offline_refresh() + info = updater.get_targetinfo("file") + assert info + self.assertIsNotNone(updater.find_cached_target(info)) + # Expect no download attempts + self.assertListEqual(self.sim.fetch_tracker.metadata, []) + self.assertListEqual(self.sim.fetch_tracker.targets, []) + + def test_get_targetinfo_with_delegated_metadata(self) -> None: + # Run a "online" refresh to get toplevel metadata in local cache + updater = self._run_refresh() + + # offline find_cached_target() fails because delegated metadata is not cached + updater = self._run_offline_refresh() + with self.assertRaises(DownloadError): + updater.get_targetinfo("delegated/file2") + + +if __name__ == "__main__": + if "--dump" in sys.argv: + TestOffline.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestOffline.dump_dir}") + sys.argv.remove("--dump") + + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index c99a365d7f..599cfe8434 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -78,13 +78,16 @@ class TrustedMetadataSet(abc.Mapping): to update the metadata with the caller making decisions on what is updated. """ - def __init__(self, root_data: bytes): + def __init__(self, root_data: bytes, respect_expiry: bool = True): """Initialize ``TrustedMetadataSet`` by loading trusted root metadata. Args: root_data: Trusted root metadata as bytes. Note that this metadata will only be verified by itself: it is the source of trust for all metadata in the ``TrustedMetadataSet`` + respect_expiry: If set to False, expired Metadata is considered valid + (all other security checks are still done). This mode should NOT be + used when loading new metadata from remote repository. Raises: RepositoryError: Metadata failed to load or verify. The actual @@ -92,6 +95,7 @@ def __init__(self, root_data: bytes): """ self._trusted_set: Dict[str, Metadata] = {} self.reference_time = datetime.datetime.utcnow() + self._respect_expiry = respect_expiry # Load and validate the local root metadata. Valid initial trusted root # metadata is required @@ -207,8 +211,8 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: raise RuntimeError("Cannot update timestamp after snapshot") # client workflow 5.3.10: Make sure final root is not expired. - if self.root.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("Final root.json is expired") + self._handle_expiry("root", self.root) + # No need to check for 5.3.11 (fast forward attack recovery): # timestamp/snapshot can not yet be loaded at this point @@ -245,25 +249,19 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: f", got version {new_snapshot_meta.version}" ) - # expiry not checked to allow old timestamp to be used for rollback - # protection of new timestamp: expiry is checked in update_snapshot() - + # Timestamp is set as trusted and then the expiry is checked. The order is this + # so local expired timestamp can be loaded and used to verify new non-expired one self._trusted_set[Timestamp.type] = new_timestamp logger.debug("Updated timestamp v%d", new_timestamp.signed.version) - # timestamp is loaded: raise if it is not valid _final_ timestamp - self._check_final_timestamp() + self._handle_expiry("timestamp", self.timestamp) return new_timestamp - def _check_final_timestamp(self) -> None: - """Raise if timestamp is expired.""" - - if self.timestamp.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("timestamp.json is expired") - def update_snapshot( - self, data: bytes, trusted: Optional[bool] = False + self, + data: bytes, + trusted: Optional[bool] = False, ) -> Metadata[Snapshot]: """Verify and load ``data`` as new snapshot metadata. @@ -300,7 +298,7 @@ def update_snapshot( logger.debug("Updating snapshot") # Snapshot cannot be loaded if final timestamp is expired - self._check_final_timestamp() + self._handle_expiry("timestamp", self.timestamp) snapshot_meta = self.timestamp.signed.snapshot_meta @@ -341,9 +339,8 @@ def update_snapshot( f"{new_fileinfo.version}, got {fileinfo.version}." ) - # expiry not checked to allow old snapshot to be used for rollback - # protection of new snapshot: it is checked when targets is updated - + # Snapshot is set as trusted and then the expiry is checked. The order is this + # so local expired snapshot can be loaded and used to verify new non-expired one self._trusted_set[Snapshot.type] = new_snapshot logger.debug("Updated snapshot v%d", new_snapshot.signed.version) @@ -352,11 +349,19 @@ def update_snapshot( return new_snapshot + def _handle_expiry(self, rolename: str, md: Metadata) -> None: + if md.signed.is_expired(self.reference_time): + message = f"Metadata for {rolename} is expired" + if self._respect_expiry: + raise exceptions.ExpiredMetadataError(message) + + logger.warning(message) + def _check_final_snapshot(self) -> None: """Raise if snapshot is expired or meta version does not match.""" - if self.snapshot.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("snapshot.json is expired") + self._handle_expiry("snapshot", self.snapshot) + snapshot_meta = self.timestamp.signed.snapshot_meta if self.snapshot.signed.version != snapshot_meta.version: raise exceptions.BadVersionNumberError( @@ -436,8 +441,7 @@ def update_delegated_targets( f"Expected {role_name} v{meta.version}, got v{version}." ) - if new_delegate.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") + self._handle_expiry(role_name, new_delegate) self._trusted_set[role_name] = new_delegate logger.debug("Updated %s v%d", role_name, version) diff --git a/tuf/ngclient/config.py b/tuf/ngclient/config.py index 5027994278..56f74714e4 100644 --- a/tuf/ngclient/config.py +++ b/tuf/ngclient/config.py @@ -8,6 +8,7 @@ @dataclass +# pylint: disable=too-many-instance-attributes class UpdaterConfig: """Used to store ``Updater`` configuration. @@ -23,7 +24,13 @@ class UpdaterConfig: are used, target download URLs are formed by prefixing the filename with a hash digest of file content by default. This can be overridden by setting ``prefix_targets_with_hash`` to ``False``. - + offline: When offline, Updater works normally except for two differences: First, + expired metadata is considered valid. Second, Updater will not + download metadata or targets (if a download would be needed, a DownloadError + is raised instead). This is useful for a client that has all required + metadata and targets cached locally and is willing to sacrifice timeliness of + metadata for offline functionality. Offline mode is not strictly compatible + with the TUF client workflow. """ max_root_rotations: int = 32 @@ -33,3 +40,4 @@ class UpdaterConfig: snapshot_max_length: int = 2000000 # bytes targets_max_length: int = 5000000 # bytes prefix_targets_with_hash: bool = True + offline: bool = False diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index ca41b2b566..ea5af17295 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -101,9 +101,11 @@ def __init__( # Read trusted local root metadata data = self._load_local_metadata(Root.type) - self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() + self._trusted_set = trusted_metadata_set.TrustedMetadataSet( + data, not self.config.offline + ) def refresh(self) -> None: """Refresh top-level metadata. @@ -124,7 +126,7 @@ def refresh(self) -> None: repository state. Raises: - OSError: New metadata could not be written to disk + OSError: New metadata could not be written to disk. RepositoryError: Metadata failed to verify in some way DownloadError: Download of a metadata file failed in some way """ @@ -160,7 +162,7 @@ def get_targetinfo(self, target_path: str) -> Optional[TargetFile]: that uniquely identifies the target within the repository. Raises: - OSError: New metadata could not be written to disk + OSError: New metadata could not be written to disk. RepositoryError: Metadata failed to verify in some way DownloadError: Download of a metadata file failed in some way @@ -229,6 +231,9 @@ def download_target( Local path to downloaded file """ + if self.config.offline: + raise exceptions.DownloadError("Cannot download when offline") + if filepath is None: filepath = self._generate_target_file_path(targetinfo) @@ -308,6 +313,8 @@ def _load_root(self) -> None: Sequentially load and persist on local disk every newer root metadata version available on the remote. """ + if self.config.offline: + return # Update the root role lower_bound = self._trusted_set.root.signed.version + 1 @@ -337,6 +344,13 @@ def _load_timestamp(self) -> None: except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid logger.debug("Local timestamp not valid as final: %s", e) + if self.config.offline: + raise exceptions.DownloadError( + "Cannot download metadata in offline mode" + ) from e + + if self.config.offline: + return # Load from remote (whether local load succeeded or not) data = self._download_metadata( @@ -361,6 +375,11 @@ def _load_snapshot(self) -> None: # Local snapshot does not exist or is invalid: update from remote logger.debug("Local snapshot not valid as final: %s", e) + if self.config.offline: + raise exceptions.DownloadError( + "Cannot download metadata in offline mode" + ) from e + snapshot_meta = self._trusted_set.timestamp.signed.snapshot_meta length = snapshot_meta.length or self.config.snapshot_max_length version = None @@ -389,6 +408,11 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: # Local 'role' does not exist or is invalid: update from remote logger.debug("Failed to load local %s: %s", role, e) + if self.config.offline: + raise exceptions.DownloadError( + f"Cannot download metadata for '{role}' in offline mode" + ) from e + snapshot = self._trusted_set.snapshot.signed metainfo = snapshot.meta.get(f"{role}.json") if metainfo is None: