From c96cd64af0131094c01e7e5c28221002ae319b7d Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Feb 2021 18:50:48 +0200 Subject: [PATCH 01/11] Add a refactored Updater.refresh() A proposal of a new Updater.refresh() implementation: - based on metadata API - no longer dependent on keydb/roledb - follows the TUF specification's client workflow Introduces a MetadataWrapper class with the goal of providing functionality which is at this point missing in metadata API. Signed-off-by: Teodora Sechkova --- tuf/client_rework/README.md | 9 + tuf/client_rework/__init__.py | 0 tuf/client_rework/metadata_wrapper.py | 186 ++++++++++ tuf/client_rework/updater_rework.py | 515 ++++++++++++++++++++++++++ 4 files changed, 710 insertions(+) create mode 100644 tuf/client_rework/README.md create mode 100644 tuf/client_rework/__init__.py create mode 100644 tuf/client_rework/metadata_wrapper.py create mode 100644 tuf/client_rework/updater_rework.py diff --git a/tuf/client_rework/README.md b/tuf/client_rework/README.md new file mode 100644 index 0000000000..aa05e534c8 --- /dev/null +++ b/tuf/client_rework/README.md @@ -0,0 +1,9 @@ +# updater.py +**updater.py** is intended as the only TUF module that software update +systems need to utilize for a low-level integration. It provides a single +class representing an updater that includes methods to download, install, and +verify metadata or target files in a secure manner. Importing +**tuf.client.updater** and instantiating its main class is all that is +required by the client prior to a TUF update request. The importation and +instantiation steps allow TUF to load all of the required metadata files +and set the repository mirror information. diff --git a/tuf/client_rework/__init__.py b/tuf/client_rework/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tuf/client_rework/metadata_wrapper.py b/tuf/client_rework/metadata_wrapper.py new file mode 100644 index 0000000000..a25a278234 --- /dev/null +++ b/tuf/client_rework/metadata_wrapper.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Metadata wrapper +""" +import time + +from securesystemslib.keys import format_metadata_to_key +from tuf.api import metadata +import tuf.exceptions + + + + +class MetadataWrapper: + """Helper classes extending or adding missing + functionality to metadata API + """ + + def __init__(self, meta): + self._meta = meta + + @classmethod + def from_json_object(cls, tmp_file): + """Loads JSON-formatted TUF metadata from a file object. + """ + raw_data = tmp_file.read() + + from tuf.api.serialization.json import JSONDeserializer + deserializer = JSONDeserializer() + _meta = deserializer.deserialize(raw_data) + return cls(meta=_meta) + + + @classmethod + def from_json_file(cls, filename): + """Loads JSON-formatted TUF metadata from a file. + """ + _meta = metadata.Metadata.from_file(filename) + return cls(meta=_meta) + + @property + def signed(self): + """ + TODO + """ + return self._meta.signed + + @property + def version(self): + """ + TODO + """ + return self._meta.signed.version + + + def verify(self, keys, threshold): + """ + TODO + """ + verified = 0 + # 1.3. Check signatures + for key in keys: + self._meta.verify(key) + verified+=1 + + if verified < threshold: + raise tuf.exceptions.InsufficientKeysError + + + def persist(self, filename): + """ + TODO + """ + self._meta.to_file(filename) + + + def expires(self, reference_time=None): + """ + TODO + """ + if reference_time is None: + expires_timestamp = tuf.formats.datetime_to_unix_timestamp( + self._meta.signed.expires) + reference_time = int(time.time()) + + if expires_timestamp < reference_time: + raise tuf.exceptions.ExpiredMetadataError + + + + +class RootWrapper(MetadataWrapper): + """ + TODO + """ + def keys(self, role): + """ + TODO + """ + keys = [] + for keyid in self._meta.signed.roles[role]['keyids']: + key_metadata = self._meta.signed.keys[keyid] + key, _ = format_metadata_to_key(key_metadata) + keys.append(key) + + return keys + + + def threshold(self, role): + """ + TODO + """ + return self._meta.signed.roles[role]['threshold'] + + + +class TimestampWrapper(MetadataWrapper): + """ + TODO + """ + @property + def snapshot(self): + """ + TODO + """ + return self._meta.signed.meta['snapshot.json'] + + +class SnapshotWrapper(MetadataWrapper): + """ + TODO + """ + def role(self, name): + """ + TODO + """ + return self._meta.signed.meta[name + '.json'] + + + +class TargetsWrapper(MetadataWrapper): + """ + TODO + """ + @property + def targets(self): + """ + TODO + """ + return self._meta.signed.targets + + + @property + def delegations(self): + """ + TODO + """ + return self._meta.signed.delegations + + + def keys(self, role): + """ + TODO + """ + keys = [] + for delegation in self._meta.signed.delegations['roles']: + if delegation['name'] == role: + for keyid in delegation['keyids']: + key_metadata = self._meta.signed.delegations['keys'][keyid] + key, _ = format_metadata_to_key(key_metadata) + keys.append(key) + return keys + + + def threshold(self, role): + """ + TODO + """ + for delegation in self._meta.signed.delegations['roles']: + if delegation['name'] == role: + return delegation['threshold'] + + return None diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py new file mode 100644 index 0000000000..427cc5595e --- /dev/null +++ b/tuf/client_rework/updater_rework.py @@ -0,0 +1,515 @@ +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""TUF client 1.0.0 draft + +TODO + +""" + +#Imports +import os +import logging +import fnmatch + +from typing import TextIO, BinaryIO, Dict, Optional + +import securesystemslib.exceptions +import securesystemslib.util + +import tuf.settings +import tuf.mirrors +import tuf.download +import tuf.exceptions +import tuf.formats + +from tuf.client.fetcher import FetcherInterface +from tuf.requests_fetcher import RequestsFetcher +from .metadata_wrapper import ( + RootWrapper, + SnapshotWrapper, + TimestampWrapper, + TargetsWrapper +) + +# Globals +logger = logging.getLogger(__name__) + +# Classes +class Updater: + """ + Provides a class that can download target files securely. + + Attributes: + metadata: + + repository_name: + + mirrors: + + fetcher: + + consistent_snapshot: + """ + + def __init__( + self, repository_name: str, + repository_mirrors: Dict, + fetcher: Optional[FetcherInterface]=None): + + self._repository_name = repository_name + self._mirrors = repository_mirrors + self._consistent_snapshot = False + self._metadata = {'root': {}, + 'timestamp': {}, + 'snapshot': {}, + 'targets': {}} + + if fetcher is None: + self._fetcher = RequestsFetcher() + else: + self._fetcher = fetcher + + + def refresh(self) -> None: + """ + This method downloads, verifies, and loads metadata for the top-level + roles in a specific order (root -> timestamp -> snapshot -> targets) + The expiration time for downloaded metadata is also verified. + + The metadata for delegated roles are not refreshed by this method, but + by the method that returns targetinfo (i.e., + get_one_valid_targetinfo()). + + The refresh() method should be called by the client before any target + requests. + """ + + self._load_root() + self._load_timestamp() + self._load_snapshot() + self._load_targets('targets', 'root') + + + def get_one_valid_targetinfo(self, filename: str) -> Dict: + """ + Returns the target information for a specific file identified by its + file path. This target method also downloads the metadata of updated + targets. + """ + return self._preorder_depth_first_walk(filename) + + + def updated_targets(self, targets: Dict, + destination_directory: str) -> Dict: + """ + After the client has retrieved the target information for those targets + they are interested in updating, they would call this method to + determine which targets have changed from those saved locally on disk. + All the targets that have changed are returns in a list. From this + list, they can request a download by calling 'download_target()'. + """ + # Keep track of the target objects and filepaths of updated targets. + # Return 'updated_targets' and use 'updated_targetpaths' to avoid + # duplicates. + updated_targets = [] + updated_targetpaths = [] + + for target in targets: + # Prepend 'destination_directory' to the target's relative filepath + # (as stored in metadata.) Verify the hash of 'target_filepath' + # against each hash listed for its fileinfo. Note: join() discards + # 'destination_directory' if 'filepath' contains a leading path + # separator (i.e., is treated as an absolute path). + filepath = target['filepath'] + target_filepath = os.path.join(destination_directory, filepath) + + if target_filepath in updated_targetpaths: + continue + + # Try one of the algorithm/digest combos for a mismatch. We break + # as soon as we find a mismatch. + for algorithm, digest in target['fileinfo']['hashes'].items(): + digest_object = None + try: + digest_object = securesystemslib.hash.digest_filename( + target_filepath, algorithm=algorithm) + + # This exception will occur if the target does not exist + # locally. + except securesystemslib.exceptions.StorageError: + updated_targets.append(target) + updated_targetpaths.append(target_filepath) + break + + # The file does exist locally, check if its hash differs. + if digest_object.hexdigest() != digest: + updated_targets.append(target) + updated_targetpaths.append(target_filepath) + break + + return updated_targets + + + def download_target(self, target: Dict, destination_directory: str): + """ + This method performs the actual download of the specified target. + The file is saved to the 'destination_directory' argument. + """ + + for temp_obj in self._mirror_target_download(target): + try: + self._verify_target_file(temp_obj, target) + # break? should we break after first successful download? + except Exception as exception: + # TODO: do something with exceptions + raise + + filepath = os.path.join(destination_directory, target['filepath']) + securesystemslib.util.persist_temp_file(temp_obj, filepath) + + + + def _mirror_meta_download( + self, filename: str, upper_length: int) -> TextIO: + """ + Download metadata file from the list of metadata mirrors + """ + file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', filename, + self._mirrors) + + file_mirror_errors = {} + for file_mirror in file_mirrors: + try: + temp_obj = tuf.download.unsafe_download( + file_mirror, + upper_length, + self._fetcher) + + temp_obj.seek(0) + yield temp_obj + + except Exception as exception: + file_mirror_errors[file_mirror] = exception + + finally: + if file_mirror_errors: + raise tuf.exceptions.NoWorkingMirrorError( + file_mirror_errors) + + + def _mirror_target_download(self, fileinfo: str) -> BinaryIO: + """ + Download target file from the list of target mirrors + """ + # full_filename = _get_full_name(filename) + file_mirrors = tuf.mirrors.get_list_of_mirrors( + 'target', fileinfo['filepath'], self._mirrors) + + file_mirror_errors = {} + for file_mirror in file_mirrors: + try: + temp_obj = tuf.download.safe_download( + file_mirror, + fileinfo['fileinfo']['length'], + self._fetcher) + + temp_obj.seek(0) + yield temp_obj + + except Exception as exception: + file_mirror_errors[file_mirror] = exception + + finally: + if file_mirror_errors: + raise tuf.exceptions.NoWorkingMirrorError( + file_mirror_errors) + + + def _get_full_meta_name(self, + role: str, + extension: str ='.json', + version: int = None) -> str: + """ + Helper method returning full metadata file path given the role name + and file extension. + """ + if version is None: + filename = role + extension + else: + filename = str(version) + '.' + role + extension + return os.path.join(tuf.settings.repositories_directory, + self._repository_name, 'metadata', 'current', filename) + + + def _get_relative_meta_name( + self, role: str, + extension: str ='.json', + version: int = None) -> str: + """ + Helper method returning full metadata file path given the role name + and file extension. + """ + if version is None: + filename = role + extension + else: + filename = str(version) + '.' + role + extension + return filename + + + def _load_root(self) -> None: + """ + If metadata file for 'root' role does not exist locally, download it + over a network, verify it and store it permanently. + """ + + # Load trusted root metadata + self._metadata['root'] = RootWrapper.from_json_file( + self._get_full_meta_name('root')) + + # Update the root role + # 1.1. Let N denote the version number of the trusted + # root metadata file. + lower_bound = self._metadata['root']._meta.signed.version + upper_bound = lower_bound + tuf.settings.MAX_NUMBER_ROOT_ROTATIONS + + verified_root = None + for next_version in range(lower_bound, upper_bound): + try: + mirror_download = self._mirror_meta_download( + self._get_relative_meta_name('root', version=next_version), + tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH) + + for temp_obj in mirror_download: + try: + verified_root = self._verify_root(temp_obj) + + except Exception as exception: + raise + + except tuf.exceptions.NoWorkingMirrorError as exception: + for mirror_error in exception.mirror_errors.values(): + if neither_403_nor_404(mirror_error): + temp_obj.close() + raise + + break + + # Check for a freeze attack. The latest known time MUST be lower + # than the expiration timestamp in the trusted root metadata file + try: + verified_root.expires() + except Exception: + temp_obj.close() + + # 1.9. If the timestamp and / or snapshot keys have been rotated, + # then delete the trusted timestamp and snapshot metadata files. + if (self._metadata['root'].keys('timestamp') != + verified_root.keys('timestamp')): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name('timestamp')) + self._metadata['timestamp'] = {} + + if (self._metadata['root'].keys('snapshot') != + verified_root.keys('snapshot')): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name('snapshot')) + self._metadata['snapshot'] = {} + + self._metadata['root'] = verified_root + # Persist root metadata. The client MUST write the file to non-volatile + # storage as FILENAME.EXT (e.g. root.json). + self._metadata['root'].persist(self._get_full_meta_name('root')) + + # 1.10. Set whether consistent snapshots are used as per + # the trusted root metadata file + self._consistent_snapshot = \ + self._metadata['root'].signed.consistent_snapshot + temp_obj.close() + + + + + + def _load_timestamp(self) -> None: + # TODO Check if timestamp exists locally + for temp_obj in self._mirror_meta_download('timestamp.json', + tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH): + try: + verified_tampstamp = self._verify_timestamp(temp_obj) + # break? should we break after first successful download? + except Exception as exception: + # TODO: do something with exceptions + temp_obj.close() + raise + + self._metadata['timestamp'] = verified_tampstamp + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata['timestamp'].persist( + self._get_full_meta_name('timestamp.json')) + + temp_obj.close() + + + + def _load_snapshot(self) -> None: + + try: + length = self._metadata['timestamp'].snapshot['length'] + except KeyError: + length = tuf.settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH + + if self._consistent_snapshot: + version = self._metadata['timestamp'].snapshot['version'] + else: + version = None + + #Check if exists locally + # self.loadLocal('snapshot', snapshotVerifier) + for temp_obj in self._mirror_meta_download('snapshot.json', length): + try: + verified_snapshot = self._verify_snapshot(temp_obj) + # break? should we break after first successful download? + except Exception as exception: + # TODO: do something with exceptions + temp_obj.close() + raise + + self._metadata['snapshot'] = verified_snapshot + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata['snapshot'].persist( + self._get_full_meta_name('snapshot.json')) + + temp_obj.close() + + + def _load_targets(self, targets_role: str, parent_role: str) -> None: + try: + length = self._metadata['snapshot'].role(targets_role)['length'] + except KeyError: + length = tuf.settings.DEFAULT_TARGETS_REQUIRED_LENGTH + + if self._consistent_snapshot: + version = self._metadata['snapshot'].role(targets_role)['version'] + else: + version = None + + + #Check if exists locally + # self.loadLocal('snapshot', targetsVerifier) + + for temp_obj in self._mirror_meta_download( + targets_role + '.json', length): + try: + verified_targets = self._verify_targets(temp_obj, + targets_role, parent_role) + # break? should we break after first successful download? + except Exception as exception: + # TODO: do something with exceptions + temp_obj.close() + raise + self._metadata[targets_role] = verified_targets + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata[targets_role].persist( + self._get_full_meta_name(targets_role, extension='.json')) + + temp_obj.close() + + + + def _verify_root(self, temp_obj: TextIO) -> RootWrapper: + + intermediate_root = RootWrapper.from_json_object(temp_obj) + + # Check for an arbitrary software attack + trusted_root = self._metadata['root'] + intermediate_root.verify(trusted_root.keys('root'), + trusted_root.threshold('root')) + intermediate_root.verify(intermediate_root.keys('root'), + intermediate_root.threshold('root')) + + # Check for a rollback attack. + if intermediate_root.version < trusted_root.version: + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + 'root', intermediate_root.version(), trusted_root.version()) + # Note that the expiration of the new (intermediate) root metadata + # file does not matter yet, because we will check for it in step 1.8. + + return intermediate_root + + + def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: + intermediate_timestamp = TimestampWrapper.from_json_object(temp_obj) + + # Check for an arbitrary software attack + trusted_root = self._metadata['root'] + intermediate_timestamp.verify( + trusted_root.keys('timestamp'), + trusted_root.threshold('timestamp')) + + # Check for a rollback attack. + if self._metadata['timestamp']: + if (intermediate_timestamp.signed.version <= + self._metadata['timestamp'].version): + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + 'root', intermediate_timestamp.version(), + self._metadata['timestamp'].version()) + + if self._metadata['snapshot']: + if (intermediate_timestamp.snapshot.version <= + self._metadata['timestamp'].snapshot['version']): + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + 'root', intermediate_timestamp.snapshot.version(), + self._metadata['snapshot'].version()) + + intermediate_timestamp.expires() + + return intermediate_timestamp + + + + def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: + + # Check against timestamp metadata + if self._metadata['timestamp'].snapshot.get('hash'): + _check_hashes(temp_obj, + self._metadata['timestamp'].snapshot.get('hash')) + + intermediate_snapshot = SnapshotWrapper.from_json_object(temp_obj) + + if (intermediate_snapshot.version != + self._metadata['timestamp'].snapshot['version']): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + # Check for an arbitrary software attack + trusted_root = self._metadata['root'] + intermediate_snapshot.verify(trusted_root.keys('snapshot'), + trusted_root.threshold('snapshot')) + + # Check for a rollback attack + if self._metadata['snapshot']: + for target_role in intermediate_snapshot.signed.meta: + if (target_role['version'] != + self._metadata['snapshot'].meta[target_role]['version']): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + intermediate_snapshot.expires() + + return intermediate_snapshot + + + + +def neither_403_nor_404(mirror_error): + if isinstance(mirror_error, tuf.exceptions.FetcherHTTPError): + if mirror_error.status_code in {403, 404}: + return False + return True From 1e52ccd4a968744fe08c536abaeccd3d96f6cf53 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Feb 2021 19:06:33 +0200 Subject: [PATCH 02/11] Add targets download functionality to the new Updater Mostly a transfer of the current client code related to the actual target files download. Needs to be further reworked. Signed-off-by: Teodora Sechkova --- tuf/client_rework/updater_rework.py | 271 ++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index 427cc5595e..8c69398ee6 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -506,7 +506,278 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: return intermediate_snapshot + def _verify_targets(self, + temp_obj: TextIO, filename: str, parent_role: str) -> TargetsWrapper: + # Check against timestamp metadata + if self._metadata['snapshot'].role(filename).get('hash'): + _check_hashes(temp_obj, + self._metadata['snapshot'].targets.get('hash')) + + intermediate_targets = TargetsWrapper.from_json_object(temp_obj) + if (intermediate_targets.version != + self._metadata['snapshot'].role(filename)['version']): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + # Check for an arbitrary software attack + parent_role = self._metadata[parent_role] + + intermediate_targets.verify(parent_role.keys(filename), + parent_role.threshold(filename)) + + intermediate_targets.expires() + + return intermediate_targets + + + + def _verify_target_file(self, + temp_obj: BinaryIO, targetinfo: Dict) -> None: + + _check_file_length(temp_obj, targetinfo['fileinfo']['length']) + _check_hashes(temp_obj, targetinfo['fileinfo']['hashes']) + + + + def _preorder_depth_first_walk(self, target_filepath) -> Dict: + + target = None + role_names = [('targets', 'root')] + visited_role_names = set() + number_of_delegations = tuf.settings.MAX_NUMBER_OF_DELEGATIONS + + # Ensure the client has the most up-to-date version of 'targets.json'. + # Raise 'tuf.exceptions.NoWorkingMirrorError' if the changed metadata + # cannot be successfully downloaded and + # 'tuf.exceptions.RepositoryError' if the referenced metadata is + # missing. Target methods such as this one are called after the + # top-level metadata have been refreshed (i.e., updater.refresh()). + # self._update_metadata_if_changed('targets') + + # Preorder depth-first traversal of the graph of target delegations. + while (target is None and + number_of_delegations > 0 and + len(role_names) > 0): + + # Pop the role name from the top of the stack. + role_name, parent_role = role_names.pop(-1) + self._load_targets(role_name, parent_role) + # Skip any visited current role to prevent cycles. + if (role_name, parent_role) in visited_role_names: + logger.debug(f"Skipping visited current role {role_name}") + continue + + # The metadata for 'role_name' must be downloaded/updated before + # its targets, delegations, and child roles can be inspected. + # self._metadata['current'][role_name] is currently missing. + # _refresh_targets_metadata() does not refresh 'targets.json', it + # expects _update_metadata_if_changed() to have already refreshed + # it, which this function has checked above. + # self._refresh_targets_metadata(role_name, + # refresh_all_delegated_roles=False) + + role_metadata = self._metadata[role_name] + targets = role_metadata.targets + target = targets.get(target_filepath) + + # After preorder check, add current role to set of visited roles. + visited_role_names.add((role_name, parent_role)) + + # And also decrement number of visited roles. + number_of_delegations -= 1 + delegations = role_metadata.delegations + child_roles = delegations.get('roles', []) + + if target is None: + + child_roles_to_visit = [] + # NOTE: This may be a slow operation if there are many + # delegated roles. + for child_role in child_roles: + child_role_name = _visit_child_role( + child_role, target_filepath) + + if (child_role['terminating'] and + child_role_name is not None): + logger.debug('Adding child role ' + + repr(child_role_name)) + logger.debug('Not backtracking to other roles.') + role_names = [] + child_roles_to_visit.append( + (child_role_name, role_name)) + break + + if child_role_name is None: + logger.debug('Skipping child role ' + + repr(child_role_name)) + + else: + logger.debug('Adding child role ' + + repr(child_role_name)) + child_roles_to_visit.append( + (child_role_name, role_name)) + + # Push 'child_roles_to_visit' in reverse order of appearance + # onto 'role_names'. Roles are popped from the end of + # the 'role_names' list. + child_roles_to_visit.reverse() + role_names.extend(child_roles_to_visit) + + else: + logger.debug('Found target in current role ' + + repr(role_name)) + + if (target is None and + number_of_delegations == 0 and + len(role_names) > 0): + logger.debug(repr(len(role_names)) + ' roles left to visit, ' + + 'but allowed to visit at most ' + + repr(tuf.settings.MAX_NUMBER_OF_DELEGATIONS) + ' delegations.') + + return {'filepath': target_filepath, 'fileinfo': target} + + + + + + +def _visit_child_role(child_role: Dict, target_filepath: str) -> str: + """ + + Non-public method that determines whether the given 'target_filepath' + is an allowed path of 'child_role'. + + Ensure that we explore only delegated roles trusted with the target. The + metadata for 'child_role' should have been refreshed prior to this point, + however, the paths/targets that 'child_role' signs for have not been + verified (as intended). The paths/targets that 'child_role' is allowed + to specify in its metadata depends on the delegating role, and thus is + left to the caller to verify. We verify here that 'target_filepath' + is an allowed path according to the delegated 'child_role'. + + TODO: Should the TUF spec restrict the repository to one particular + algorithm? Should we allow the repository to specify in the role + dictionary the algorithm used for these generated hashed paths? + + + child_role: + The delegation targets role object of 'child_role', containing its + paths, path_hash_prefixes, keys, and so on. + + target_filepath: + The path to the target file on the repository. This will be relative to + the 'targets' (or equivalent) directory on a given mirror. + + + None. + + + None. + + + If 'child_role' has been delegated the target with the name + 'target_filepath', then we return the role name of 'child_role'. + + Otherwise, we return None. + """ + + child_role_name = child_role['name'] + child_role_paths = child_role.get('paths') + child_role_path_hash_prefixes = child_role.get('path_hash_prefixes') + + if child_role_path_hash_prefixes is not None: + target_filepath_hash = _get_target_hash(target_filepath) + for child_role_path_hash_prefix in child_role_path_hash_prefixes: + if not target_filepath_hash.startswith(child_role_path_hash_prefix): + continue + + return child_role_name + + elif child_role_paths is not None: + # Is 'child_role_name' allowed to sign for 'target_filepath'? + for child_role_path in child_role_paths: + # A child role path may be an explicit path or glob pattern (Unix + # shell-style wildcards). The child role 'child_role_name' is + # returned if 'target_filepath' is equal to or matches + # 'child_role_path'. Explicit filepaths are also considered + # matches. A repo maintainer might delegate a glob pattern with a + # leading path separator, while the client requests a matching + # target without a leading path separator - make sure to strip any + # leading path separators so that a match is made. + # Example: "foo.tgz" should match with "/*.tgz". + if fnmatch.fnmatch(target_filepath.lstrip(os.sep), + child_role_path.lstrip(os.sep)): + logger.debug('Child role ' + repr(child_role_name) + + ' is allowed to sign for ' + repr(target_filepath)) + + return child_role_name + + logger.debug( + 'The given target path ' + repr(target_filepath) + + ' does not match the trusted path or glob pattern: ' + + repr(child_role_path)) + continue + + else: + # 'role_name' should have been validated when it was downloaded. + # The 'paths' or 'path_hash_prefixes' fields should not be missing, + # so we raise a format error here in case they are both missing. + raise tuf.exceptions.FormatError(repr(child_role_name) + ' ' + 'has neither a "paths" nor "path_hash_prefixes". At least' + ' one of these attributes must be present.') + + return None + + +def _check_file_length(file_object, trusted_file_length): + + file_object.seek(0, 2) + observed_length = file_object.tell() + + # Return and log a message if the length 'file_object' is equal to + # 'trusted_file_length', otherwise raise an exception. A hard check + # ensures that a downloaded file strictly matches a known, or trusted, + # file length. + if observed_length != trusted_file_length: + raise tuf.exceptions.DownloadLengthMismatchError(trusted_file_length, + observed_length) + + +def _check_hashes(file_object, trusted_hashes): + + # Verify each trusted hash of 'trusted_hashes'. If all are valid, simply + # return. + for algorithm, trusted_hash in trusted_hashes.items(): + digest_object = securesystemslib.hash.digest(algorithm) + # Ensure we read from the beginning of the file object + # TODO: should we store file position (before the loop) and reset + # after we seek about? + file_object.seek(0) + digest_object.update(file_object.read()) + computed_hash = digest_object.hexdigest() + + # Raise an exception if any of the hashes are incorrect. + if trusted_hash != computed_hash: + raise securesystemslib.exceptions.BadHashError(trusted_hash, + computed_hash) + + logger.info('The file\'s ' + algorithm + ' hash is' + ' correct: ' + trusted_hash) + + + +def _get_target_hash(target_filepath, hash_function='sha256'): + + # Calculate the hash of the filepath to determine which bin to find the + # target. The client currently assumes the repository (i.e., repository + # tool) uses 'hash_function' to generate hashes and UTF-8. + digest_object = securesystemslib.hash.digest(hash_function) + encoded_target_filepath = target_filepath.encode('utf-8') + digest_object.update(encoded_target_filepath) + target_filepath_hash = digest_object.hexdigest() + + return target_filepath_hash def neither_403_nor_404(mirror_error): if isinstance(mirror_error, tuf.exceptions.FetcherHTTPError): From f666a59bd9b58c6bc6287391c99ec9c4ad247851 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Feb 2021 19:10:28 +0200 Subject: [PATCH 03/11] Add tests and linter config to the new Updater Adds a basic test case for Updater. Applies the linter config used in api/metadata.py to all files under client_rework. Signed-off-by: Teodora Sechkova --- tests/test_updater_rework.py | 248 +++++++++++++++++++++++++++++++++++ tox.ini | 5 + tuf/client_rework/pylintrc | 6 + 3 files changed, 259 insertions(+) create mode 100644 tests/test_updater_rework.py create mode 100644 tuf/client_rework/pylintrc diff --git a/tests/test_updater_rework.py b/tests/test_updater_rework.py new file mode 100644 index 0000000000..bc6ce3a3f1 --- /dev/null +++ b/tests/test_updater_rework.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test Updater class +""" + +import os +import time +import shutil +import copy +import tempfile +import logging +import errno +import sys +import unittest +import json +import tracemalloc + +if sys.version_info >= (3, 3): + import unittest.mock as mock +else: + import mock + +import tuf +import tuf.exceptions +import tuf.log +import tuf.repository_tool as repo_tool +import tuf.unittest_toolbox as unittest_toolbox +import tuf.client_rework.updater_rework as updater + +from tests import utils +from tuf.api import metadata + +import securesystemslib + +logger = logging.getLogger(__name__) + + +class TestUpdater(unittest_toolbox.Modified_TestCase): + + @classmethod + def setUpClass(cls): + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Needed because in some tests simple_server.py cannot be found. + # The reason is that the current working directory + # has been changed when executing a subprocess. + cls.SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py') + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served + # by the SimpleHTTPServer launched here. The test cases of 'test_updater.py' + # assume the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.server_process_handler = utils.TestServerProcess(log=logger, + server=cls.SIMPLE_SERVER_PATH) + + + + @classmethod + def tearDownClass(cls): + # Cleans the resources and flush the logged lines (if any). + cls.server_process_handler.clean() + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + self.repository_name = 'test_repository1' + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf.tests/'. + original_repository_files = os.path.join(os.getcwd(), 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_keystore = os.path.join(original_repository_files, 'keystore') + original_client = os.path.join(original_repository_files, 'client') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.keystore_directory = \ + os.path.join(temporary_repository_root, 'keystore') + + self.client_directory = os.path.join(temporary_repository_root, + 'client') + self.client_metadata = os.path.join(self.client_directory, + self.repository_name, 'metadata') + self.client_metadata_current = os.path.join(self.client_metadata, + 'current') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ + + str(self.server_process_handler.port) + repository_basepath + + # Setting 'tuf.settings.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.settings.repositories_directory = self.client_directory + + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets'}} + + # Creating a repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater(self.repository_name, + self.repository_mirrors) + + # Metadata role keys are needed by the test cases to make changes to the + # repository (e.g., adding a new target file to 'targets.json' and then + # requesting a refresh()). + self.role_keys = _load_role_keys(self.keystore_directory) + + + + def tearDown(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Logs stdout and stderr from the sever subprocess. + self.server_process_handler.flush_log() + + + + # UNIT TESTS. + def test_refresh(self): + + self.repository_updater.refresh() + + for role in ['root', 'timestamp', 'snapshot', 'targets']: + metadata_obj = metadata.Metadata.from_file(os.path.join( + self.client_metadata_current, role + '.json')) + + metadata_obj_2 = metadata.Metadata.from_file(os.path.join( + self.repository_directory, 'metadata', role + '.json')) + + + self.assertDictEqual(metadata_obj.to_dict(), + metadata_obj_2.to_dict()) + + # Get targetinfo for 'file1.txt' listed in targets + targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt') + # Get targetinfo for 'file3.txt' listed in the delegated role1 + targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') + + destination_directory = self.make_temp_directory() + updated_targets = self.repository_updater.updated_targets([targetinfo1, targetinfo3], + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo1, targetinfo3]) + + self.repository_updater.download_target(targetinfo1, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo3]) + + + self.repository_updater.download_target(targetinfo3, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, []) + + +def _load_role_keys(keystore_directory): + + # Populating 'self.role_keys' by importing the required public and private + # keys of 'tuf/tests/repository_data/'. The role keys are needed when + # modifying the remote repository used by the test cases in this unit test. + + # The pre-generated key files in 'repository_data/keystore' are all encrypted with + # a 'password' passphrase. + EXPECTED_KEYFILE_PASSWORD = 'password' + + # Store and return the cryptography keys of the top-level roles, including 1 + # delegated role. + role_keys = {} + + root_key_file = os.path.join(keystore_directory, 'root_key') + targets_key_file = os.path.join(keystore_directory, 'targets_key') + snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key') + timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key') + delegation_key_file = os.path.join(keystore_directory, 'delegation_key') + + role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {}, + 'role1': {}} + + # Import the top-level and delegated role public keys. + role_keys['root']['public'] = \ + repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub') + role_keys['targets']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub') + role_keys['snapshot']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub') + role_keys['timestamp']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub') + role_keys['role1']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub') + + # Import the private keys of the top-level and delegated roles. + role_keys['root']['private'] = \ + repo_tool.import_rsa_privatekey_from_file(root_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['targets']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(targets_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['snapshot']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['timestamp']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['role1']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(delegation_key_file, + EXPECTED_KEYFILE_PASSWORD) + + return role_keys + +if __name__ == '__main__': + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tox.ini b/tox.ini index 9cdafa6e58..27d4b46534 100644 --- a/tox.ini +++ b/tox.ini @@ -51,3 +51,8 @@ commands = pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization bandit -r {toxinidir}/tuf + +[testenv:lint-client] +commands = + pylint {toxinidir}/tuf/client_rework --rcfile={toxinidir}/tuf/client_rework/pylintrc + bandit -r {toxinidir}/tuf diff --git a/tuf/client_rework/pylintrc b/tuf/client_rework/pylintrc new file mode 100644 index 0000000000..a75347f446 --- /dev/null +++ b/tuf/client_rework/pylintrc @@ -0,0 +1,6 @@ +[MESSAGE_CONTROL] +disable=fixme + +[FORMAT] +indent-string=" " +max-line-length=79 From 1fa7412e87d0a73ca1446601169a8477a6792a9a Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Thu, 18 Mar 2021 17:52:05 +0200 Subject: [PATCH 04/11] Temporary reduce code coverage to 90 percent Coverage failures may hide other failing tests in the CI. Configure coverage to fail under 90 percent during the ongoing experimental-client development. Signed-off-by: Teodora Sechkova --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 27d4b46534..d87964de14 100644 --- a/tox.ini +++ b/tox.ini @@ -16,7 +16,7 @@ changedir = tests commands = python --version coverage run aggregate_tests.py - coverage report -m --fail-under 97 + coverage report -m --fail-under 90 deps = -r{toxinidir}/requirements-test.txt From 8abe49abb4646d3b32f34dabbf98ede13eeee0bf Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Thu, 18 Mar 2021 18:11:20 +0200 Subject: [PATCH 05/11] Add .gitattributes file For compatibility with Windows systems, declare repository_data files to always have LF line endings on checkout. A trailing "/**" matches everything inside, with infinite depth. Signed-off-by: Teodora Sechkova --- .gitattributes | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..66709ac428 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,3 @@ +# Files that will always have LF line endings on checkout. +tests/repository_data/** text eol=lf + From c4b5eb5e6b48f74bd85b90375b60fc0cfe382b40 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 20:15:38 +0200 Subject: [PATCH 06/11] Update the tox lint environment Apply the updated api/pylintrc config to the client_rework directory. Signed-off-by: Teodora Sechkova --- tox.ini | 8 ++------ tuf/client_rework/pylintrc | 6 ------ 2 files changed, 2 insertions(+), 12 deletions(-) delete mode 100644 tuf/client_rework/pylintrc diff --git a/tox.ini b/tox.ini index d87964de14..8036eada7e 100644 --- a/tox.ini +++ b/tox.ini @@ -45,14 +45,10 @@ commands = black --check --diff --line-length 80 {toxinidir}/tuf/api isort --check --diff --line-length 80 --profile black -p tuf {toxinidir}/tuf/api pylint {toxinidir}/tuf/api --rcfile={toxinidir}/tuf/api/pylintrc + pylint {toxinidir}/tuf/client_rework --rcfile={toxinidir}/tuf/api/pylintrc # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. - pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization + pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization,{toxinidir}/tuf/client_rework bandit -r {toxinidir}/tuf - -[testenv:lint-client] -commands = - pylint {toxinidir}/tuf/client_rework --rcfile={toxinidir}/tuf/client_rework/pylintrc - bandit -r {toxinidir}/tuf diff --git a/tuf/client_rework/pylintrc b/tuf/client_rework/pylintrc deleted file mode 100644 index a75347f446..0000000000 --- a/tuf/client_rework/pylintrc +++ /dev/null @@ -1,6 +0,0 @@ -[MESSAGE_CONTROL] -disable=fixme - -[FORMAT] -indent-string=" " -max-line-length=79 From 5edb58c537d04839c1ae122a3145287b396f6327 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 20:21:12 +0200 Subject: [PATCH 07/11] Apply black and isort over the refactored client Run manually the black and isort code formatters over the client_rework code. Signed-off-by: Teodora Sechkova --- tuf/client_rework/metadata_wrapper.py | 55 ++- tuf/client_rework/updater_rework.py | 461 ++++++++++++++------------ 2 files changed, 270 insertions(+), 246 deletions(-) diff --git a/tuf/client_rework/metadata_wrapper.py b/tuf/client_rework/metadata_wrapper.py index a25a278234..89b4a1da1e 100644 --- a/tuf/client_rework/metadata_wrapper.py +++ b/tuf/client_rework/metadata_wrapper.py @@ -8,10 +8,9 @@ import time from securesystemslib.keys import format_metadata_to_key -from tuf.api import metadata -import tuf.exceptions - +import tuf.exceptions +from tuf.api import metadata class MetadataWrapper: @@ -24,20 +23,18 @@ def __init__(self, meta): @classmethod def from_json_object(cls, tmp_file): - """Loads JSON-formatted TUF metadata from a file object. - """ + """Loads JSON-formatted TUF metadata from a file object.""" raw_data = tmp_file.read() from tuf.api.serialization.json import JSONDeserializer + deserializer = JSONDeserializer() _meta = deserializer.deserialize(raw_data) return cls(meta=_meta) - @classmethod def from_json_file(cls, filename): - """Loads JSON-formatted TUF metadata from a file. - """ + """Loads JSON-formatted TUF metadata from a file.""" _meta = metadata.Metadata.from_file(filename) return cls(meta=_meta) @@ -55,7 +52,6 @@ def version(self): """ return self._meta.signed.version - def verify(self, keys, threshold): """ TODO @@ -64,87 +60,85 @@ def verify(self, keys, threshold): # 1.3. Check signatures for key in keys: self._meta.verify(key) - verified+=1 + verified += 1 if verified < threshold: raise tuf.exceptions.InsufficientKeysError - def persist(self, filename): """ TODO """ self._meta.to_file(filename) - def expires(self, reference_time=None): """ TODO """ if reference_time is None: expires_timestamp = tuf.formats.datetime_to_unix_timestamp( - self._meta.signed.expires) + self._meta.signed.expires + ) reference_time = int(time.time()) if expires_timestamp < reference_time: raise tuf.exceptions.ExpiredMetadataError - - class RootWrapper(MetadataWrapper): """ TODO """ + def keys(self, role): """ TODO """ keys = [] - for keyid in self._meta.signed.roles[role]['keyids']: + for keyid in self._meta.signed.roles[role]["keyids"]: key_metadata = self._meta.signed.keys[keyid] key, _ = format_metadata_to_key(key_metadata) keys.append(key) return keys - def threshold(self, role): """ TODO """ - return self._meta.signed.roles[role]['threshold'] - + return self._meta.signed.roles[role]["threshold"] class TimestampWrapper(MetadataWrapper): """ TODO """ + @property def snapshot(self): """ TODO """ - return self._meta.signed.meta['snapshot.json'] + return self._meta.signed.meta["snapshot.json"] class SnapshotWrapper(MetadataWrapper): """ TODO """ + def role(self, name): """ TODO """ - return self._meta.signed.meta[name + '.json'] - + return self._meta.signed.meta[name + ".json"] class TargetsWrapper(MetadataWrapper): """ TODO """ + @property def targets(self): """ @@ -152,7 +146,6 @@ def targets(self): """ return self._meta.signed.targets - @property def delegations(self): """ @@ -160,27 +153,25 @@ def delegations(self): """ return self._meta.signed.delegations - def keys(self, role): """ TODO """ keys = [] - for delegation in self._meta.signed.delegations['roles']: - if delegation['name'] == role: - for keyid in delegation['keyids']: - key_metadata = self._meta.signed.delegations['keys'][keyid] + for delegation in self._meta.signed.delegations["roles"]: + if delegation["name"] == role: + for keyid in delegation["keyids"]: + key_metadata = self._meta.signed.delegations["keys"][keyid] key, _ = format_metadata_to_key(key_metadata) keys.append(key) return keys - def threshold(self, role): """ TODO """ - for delegation in self._meta.signed.delegations['roles']: - if delegation['name'] == role: - return delegation['threshold'] + for delegation in self._meta.signed.delegations["roles"]: + if delegation["name"] == role: + return delegation["threshold"] return None diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index 8c69398ee6..9e04c20475 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -7,29 +7,27 @@ """ -#Imports -import os -import logging import fnmatch - -from typing import TextIO, BinaryIO, Dict, Optional +import logging +import os +from typing import BinaryIO, Dict, Optional, TextIO import securesystemslib.exceptions import securesystemslib.util -import tuf.settings -import tuf.mirrors import tuf.download import tuf.exceptions import tuf.formats - +import tuf.mirrors +import tuf.settings from tuf.client.fetcher import FetcherInterface from tuf.requests_fetcher import RequestsFetcher + from .metadata_wrapper import ( RootWrapper, SnapshotWrapper, + TargetsWrapper, TimestampWrapper, - TargetsWrapper ) # Globals @@ -53,24 +51,27 @@ class Updater: """ def __init__( - self, repository_name: str, - repository_mirrors: Dict, - fetcher: Optional[FetcherInterface]=None): + self, + repository_name: str, + repository_mirrors: Dict, + fetcher: Optional[FetcherInterface] = None, + ): self._repository_name = repository_name self._mirrors = repository_mirrors self._consistent_snapshot = False - self._metadata = {'root': {}, - 'timestamp': {}, - 'snapshot': {}, - 'targets': {}} + self._metadata = { + "root": {}, + "timestamp": {}, + "snapshot": {}, + "targets": {}, + } if fetcher is None: self._fetcher = RequestsFetcher() else: self._fetcher = fetcher - def refresh(self) -> None: """ This method downloads, verifies, and loads metadata for the top-level @@ -88,8 +89,7 @@ def refresh(self) -> None: self._load_root() self._load_timestamp() self._load_snapshot() - self._load_targets('targets', 'root') - + self._load_targets("targets", "root") def get_one_valid_targetinfo(self, filename: str) -> Dict: """ @@ -99,9 +99,9 @@ def get_one_valid_targetinfo(self, filename: str) -> Dict: """ return self._preorder_depth_first_walk(filename) - - def updated_targets(self, targets: Dict, - destination_directory: str) -> Dict: + def updated_targets( + self, targets: Dict, destination_directory: str + ) -> Dict: """ After the client has retrieved the target information for those targets they are interested in updating, they would call this method to @@ -121,7 +121,7 @@ def updated_targets(self, targets: Dict, # against each hash listed for its fileinfo. Note: join() discards # 'destination_directory' if 'filepath' contains a leading path # separator (i.e., is treated as an absolute path). - filepath = target['filepath'] + filepath = target["filepath"] target_filepath = os.path.join(destination_directory, filepath) if target_filepath in updated_targetpaths: @@ -129,11 +129,12 @@ def updated_targets(self, targets: Dict, # Try one of the algorithm/digest combos for a mismatch. We break # as soon as we find a mismatch. - for algorithm, digest in target['fileinfo']['hashes'].items(): + for algorithm, digest in target["fileinfo"]["hashes"].items(): digest_object = None try: digest_object = securesystemslib.hash.digest_filename( - target_filepath, algorithm=algorithm) + target_filepath, algorithm=algorithm + ) # This exception will occur if the target does not exist # locally. @@ -150,7 +151,6 @@ def updated_targets(self, targets: Dict, return updated_targets - def download_target(self, target: Dict, destination_directory: str): """ This method performs the actual download of the specified target. @@ -165,26 +165,23 @@ def download_target(self, target: Dict, destination_directory: str): # TODO: do something with exceptions raise - filepath = os.path.join(destination_directory, target['filepath']) + filepath = os.path.join(destination_directory, target["filepath"]) securesystemslib.util.persist_temp_file(temp_obj, filepath) - - - def _mirror_meta_download( - self, filename: str, upper_length: int) -> TextIO: + def _mirror_meta_download(self, filename: str, upper_length: int) -> TextIO: """ Download metadata file from the list of metadata mirrors """ - file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', filename, - self._mirrors) + file_mirrors = tuf.mirrors.get_list_of_mirrors( + "meta", filename, self._mirrors + ) file_mirror_errors = {} for file_mirror in file_mirrors: try: temp_obj = tuf.download.unsafe_download( - file_mirror, - upper_length, - self._fetcher) + file_mirror, upper_length, self._fetcher + ) temp_obj.seek(0) yield temp_obj @@ -195,8 +192,8 @@ def _mirror_meta_download( finally: if file_mirror_errors: raise tuf.exceptions.NoWorkingMirrorError( - file_mirror_errors) - + file_mirror_errors + ) def _mirror_target_download(self, fileinfo: str) -> BinaryIO: """ @@ -204,15 +201,15 @@ def _mirror_target_download(self, fileinfo: str) -> BinaryIO: """ # full_filename = _get_full_name(filename) file_mirrors = tuf.mirrors.get_list_of_mirrors( - 'target', fileinfo['filepath'], self._mirrors) + "target", fileinfo["filepath"], self._mirrors + ) file_mirror_errors = {} for file_mirror in file_mirrors: try: temp_obj = tuf.download.safe_download( - file_mirror, - fileinfo['fileinfo']['length'], - self._fetcher) + file_mirror, fileinfo["fileinfo"]["length"], self._fetcher + ) temp_obj.seek(0) yield temp_obj @@ -223,13 +220,12 @@ def _mirror_target_download(self, fileinfo: str) -> BinaryIO: finally: if file_mirror_errors: raise tuf.exceptions.NoWorkingMirrorError( - file_mirror_errors) + file_mirror_errors + ) - - def _get_full_meta_name(self, - role: str, - extension: str ='.json', - version: int = None) -> str: + def _get_full_meta_name( + self, role: str, extension: str = ".json", version: int = None + ) -> str: """ Helper method returning full metadata file path given the role name and file extension. @@ -237,15 +233,18 @@ def _get_full_meta_name(self, if version is None: filename = role + extension else: - filename = str(version) + '.' + role + extension - return os.path.join(tuf.settings.repositories_directory, - self._repository_name, 'metadata', 'current', filename) - + filename = str(version) + "." + role + extension + return os.path.join( + tuf.settings.repositories_directory, + self._repository_name, + "metadata", + "current", + filename, + ) def _get_relative_meta_name( - self, role: str, - extension: str ='.json', - version: int = None) -> str: + self, role: str, extension: str = ".json", version: int = None + ) -> str: """ Helper method returning full metadata file path given the role name and file extension. @@ -253,32 +252,33 @@ def _get_relative_meta_name( if version is None: filename = role + extension else: - filename = str(version) + '.' + role + extension + filename = str(version) + "." + role + extension return filename - - def _load_root(self) -> None: + def _load_root(self) -> None: """ If metadata file for 'root' role does not exist locally, download it over a network, verify it and store it permanently. """ # Load trusted root metadata - self._metadata['root'] = RootWrapper.from_json_file( - self._get_full_meta_name('root')) + self._metadata["root"] = RootWrapper.from_json_file( + self._get_full_meta_name("root") + ) # Update the root role # 1.1. Let N denote the version number of the trusted # root metadata file. - lower_bound = self._metadata['root']._meta.signed.version + lower_bound = self._metadata["root"]._meta.signed.version upper_bound = lower_bound + tuf.settings.MAX_NUMBER_ROOT_ROTATIONS verified_root = None for next_version in range(lower_bound, upper_bound): try: mirror_download = self._mirror_meta_download( - self._get_relative_meta_name('root', version=next_version), - tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH) + self._get_relative_meta_name("root", version=next_version), + tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, + ) for temp_obj in mirror_download: try: @@ -304,37 +304,37 @@ def _load_root(self) -> None: # 1.9. If the timestamp and / or snapshot keys have been rotated, # then delete the trusted timestamp and snapshot metadata files. - if (self._metadata['root'].keys('timestamp') != - verified_root.keys('timestamp')): - # FIXME: use abstract storage - os.remove(self._get_full_meta_name('timestamp')) - self._metadata['timestamp'] = {} - - if (self._metadata['root'].keys('snapshot') != - verified_root.keys('snapshot')): - # FIXME: use abstract storage - os.remove(self._get_full_meta_name('snapshot')) - self._metadata['snapshot'] = {} - - self._metadata['root'] = verified_root + if self._metadata["root"].keys("timestamp") != verified_root.keys( + "timestamp" + ): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name("timestamp")) + self._metadata["timestamp"] = {} + + if self._metadata["root"].keys("snapshot") != verified_root.keys( + "snapshot" + ): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name("snapshot")) + self._metadata["snapshot"] = {} + + self._metadata["root"] = verified_root # Persist root metadata. The client MUST write the file to non-volatile # storage as FILENAME.EXT (e.g. root.json). - self._metadata['root'].persist(self._get_full_meta_name('root')) + self._metadata["root"].persist(self._get_full_meta_name("root")) # 1.10. Set whether consistent snapshots are used as per # the trusted root metadata file - self._consistent_snapshot = \ - self._metadata['root'].signed.consistent_snapshot + self._consistent_snapshot = self._metadata[ + "root" + ].signed.consistent_snapshot temp_obj.close() - - - - def _load_timestamp(self) -> None: # TODO Check if timestamp exists locally - for temp_obj in self._mirror_meta_download('timestamp.json', - tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH): + for temp_obj in self._mirror_meta_download( + "timestamp.json", tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH + ): try: verified_tampstamp = self._verify_timestamp(temp_obj) # break? should we break after first successful download? @@ -343,31 +343,30 @@ def _load_timestamp(self) -> None: temp_obj.close() raise - self._metadata['timestamp'] = verified_tampstamp + self._metadata["timestamp"] = verified_tampstamp # Persist root metadata. The client MUST write the file to # non-volatile storage as FILENAME.EXT (e.g. root.json). - self._metadata['timestamp'].persist( - self._get_full_meta_name('timestamp.json')) + self._metadata["timestamp"].persist( + self._get_full_meta_name("timestamp.json") + ) temp_obj.close() - - def _load_snapshot(self) -> None: try: - length = self._metadata['timestamp'].snapshot['length'] + length = self._metadata["timestamp"].snapshot["length"] except KeyError: length = tuf.settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH if self._consistent_snapshot: - version = self._metadata['timestamp'].snapshot['version'] + version = self._metadata["timestamp"].snapshot["version"] else: version = None - #Check if exists locally + # Check if exists locally # self.loadLocal('snapshot', snapshotVerifier) - for temp_obj in self._mirror_meta_download('snapshot.json', length): + for temp_obj in self._mirror_meta_download("snapshot.json", length): try: verified_snapshot = self._verify_snapshot(temp_obj) # break? should we break after first successful download? @@ -376,35 +375,36 @@ def _load_snapshot(self) -> None: temp_obj.close() raise - self._metadata['snapshot'] = verified_snapshot + self._metadata["snapshot"] = verified_snapshot # Persist root metadata. The client MUST write the file to # non-volatile storage as FILENAME.EXT (e.g. root.json). - self._metadata['snapshot'].persist( - self._get_full_meta_name('snapshot.json')) + self._metadata["snapshot"].persist( + self._get_full_meta_name("snapshot.json") + ) temp_obj.close() - def _load_targets(self, targets_role: str, parent_role: str) -> None: try: - length = self._metadata['snapshot'].role(targets_role)['length'] + length = self._metadata["snapshot"].role(targets_role)["length"] except KeyError: length = tuf.settings.DEFAULT_TARGETS_REQUIRED_LENGTH if self._consistent_snapshot: - version = self._metadata['snapshot'].role(targets_role)['version'] + version = self._metadata["snapshot"].role(targets_role)["version"] else: version = None - - #Check if exists locally + # Check if exists locally # self.loadLocal('snapshot', targetsVerifier) for temp_obj in self._mirror_meta_download( - targets_role + '.json', length): + targets_role + ".json", length + ): try: - verified_targets = self._verify_targets(temp_obj, - targets_role, parent_role) + verified_targets = self._verify_targets( + temp_obj, targets_role, parent_role + ) # break? should we break after first successful download? except Exception as exception: # TODO: do something with exceptions @@ -414,90 +414,103 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None: # Persist root metadata. The client MUST write the file to # non-volatile storage as FILENAME.EXT (e.g. root.json). self._metadata[targets_role].persist( - self._get_full_meta_name(targets_role, extension='.json')) + self._get_full_meta_name(targets_role, extension=".json") + ) temp_obj.close() - - def _verify_root(self, temp_obj: TextIO) -> RootWrapper: intermediate_root = RootWrapper.from_json_object(temp_obj) # Check for an arbitrary software attack - trusted_root = self._metadata['root'] - intermediate_root.verify(trusted_root.keys('root'), - trusted_root.threshold('root')) - intermediate_root.verify(intermediate_root.keys('root'), - intermediate_root.threshold('root')) + trusted_root = self._metadata["root"] + intermediate_root.verify( + trusted_root.keys("root"), trusted_root.threshold("root") + ) + intermediate_root.verify( + intermediate_root.keys("root"), intermediate_root.threshold("root") + ) # Check for a rollback attack. if intermediate_root.version < trusted_root.version: temp_obj.close() raise tuf.exceptions.ReplayedMetadataError( - 'root', intermediate_root.version(), trusted_root.version()) + "root", intermediate_root.version(), trusted_root.version() + ) # Note that the expiration of the new (intermediate) root metadata # file does not matter yet, because we will check for it in step 1.8. return intermediate_root - def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: intermediate_timestamp = TimestampWrapper.from_json_object(temp_obj) # Check for an arbitrary software attack - trusted_root = self._metadata['root'] + trusted_root = self._metadata["root"] intermediate_timestamp.verify( - trusted_root.keys('timestamp'), - trusted_root.threshold('timestamp')) + trusted_root.keys("timestamp"), trusted_root.threshold("timestamp") + ) # Check for a rollback attack. - if self._metadata['timestamp']: - if (intermediate_timestamp.signed.version <= - self._metadata['timestamp'].version): + if self._metadata["timestamp"]: + if ( + intermediate_timestamp.signed.version + <= self._metadata["timestamp"].version + ): temp_obj.close() raise tuf.exceptions.ReplayedMetadataError( - 'root', intermediate_timestamp.version(), - self._metadata['timestamp'].version()) - - if self._metadata['snapshot']: - if (intermediate_timestamp.snapshot.version <= - self._metadata['timestamp'].snapshot['version']): + "root", + intermediate_timestamp.version(), + self._metadata["timestamp"].version(), + ) + + if self._metadata["snapshot"]: + if ( + intermediate_timestamp.snapshot.version + <= self._metadata["timestamp"].snapshot["version"] + ): temp_obj.close() raise tuf.exceptions.ReplayedMetadataError( - 'root', intermediate_timestamp.snapshot.version(), - self._metadata['snapshot'].version()) + "root", + intermediate_timestamp.snapshot.version(), + self._metadata["snapshot"].version(), + ) intermediate_timestamp.expires() return intermediate_timestamp - - def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: - # Check against timestamp metadata - if self._metadata['timestamp'].snapshot.get('hash'): - _check_hashes(temp_obj, - self._metadata['timestamp'].snapshot.get('hash')) + # Check against timestamp metadata + if self._metadata["timestamp"].snapshot.get("hash"): + _check_hashes( + temp_obj, self._metadata["timestamp"].snapshot.get("hash") + ) intermediate_snapshot = SnapshotWrapper.from_json_object(temp_obj) - if (intermediate_snapshot.version != - self._metadata['timestamp'].snapshot['version']): + if ( + intermediate_snapshot.version + != self._metadata["timestamp"].snapshot["version"] + ): temp_obj.close() raise tuf.exceptions.BadVersionNumberError # Check for an arbitrary software attack - trusted_root = self._metadata['root'] - intermediate_snapshot.verify(trusted_root.keys('snapshot'), - trusted_root.threshold('snapshot')) + trusted_root = self._metadata["root"] + intermediate_snapshot.verify( + trusted_root.keys("snapshot"), trusted_root.threshold("snapshot") + ) # Check for a rollback attack - if self._metadata['snapshot']: + if self._metadata["snapshot"]: for target_role in intermediate_snapshot.signed.meta: - if (target_role['version'] != - self._metadata['snapshot'].meta[target_role]['version']): + if ( + target_role["version"] + != self._metadata["snapshot"].meta[target_role]["version"] + ): temp_obj.close() raise tuf.exceptions.BadVersionNumberError @@ -505,45 +518,44 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: return intermediate_snapshot + def _verify_targets( + self, temp_obj: TextIO, filename: str, parent_role: str + ) -> TargetsWrapper: - def _verify_targets(self, - temp_obj: TextIO, filename: str, parent_role: str) -> TargetsWrapper: - - # Check against timestamp metadata - if self._metadata['snapshot'].role(filename).get('hash'): - _check_hashes(temp_obj, - self._metadata['snapshot'].targets.get('hash')) + # Check against timestamp metadata + if self._metadata["snapshot"].role(filename).get("hash"): + _check_hashes( + temp_obj, self._metadata["snapshot"].targets.get("hash") + ) intermediate_targets = TargetsWrapper.from_json_object(temp_obj) - if (intermediate_targets.version != - self._metadata['snapshot'].role(filename)['version']): + if ( + intermediate_targets.version + != self._metadata["snapshot"].role(filename)["version"] + ): temp_obj.close() raise tuf.exceptions.BadVersionNumberError # Check for an arbitrary software attack parent_role = self._metadata[parent_role] - intermediate_targets.verify(parent_role.keys(filename), - parent_role.threshold(filename)) + intermediate_targets.verify( + parent_role.keys(filename), parent_role.threshold(filename) + ) intermediate_targets.expires() return intermediate_targets + def _verify_target_file(self, temp_obj: BinaryIO, targetinfo: Dict) -> None: - - def _verify_target_file(self, - temp_obj: BinaryIO, targetinfo: Dict) -> None: - - _check_file_length(temp_obj, targetinfo['fileinfo']['length']) - _check_hashes(temp_obj, targetinfo['fileinfo']['hashes']) - - + _check_file_length(temp_obj, targetinfo["fileinfo"]["length"]) + _check_hashes(temp_obj, targetinfo["fileinfo"]["hashes"]) def _preorder_depth_first_walk(self, target_filepath) -> Dict: target = None - role_names = [('targets', 'root')] + role_names = [("targets", "root")] visited_role_names = set() number_of_delegations = tuf.settings.MAX_NUMBER_OF_DELEGATIONS @@ -556,9 +568,9 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # self._update_metadata_if_changed('targets') # Preorder depth-first traversal of the graph of target delegations. - while (target is None and - number_of_delegations > 0 and - len(role_names) > 0): + while ( + target is None and number_of_delegations > 0 and len(role_names) > 0 + ): # Pop the role name from the top of the stack. role_name, parent_role = role_names.pop(-1) @@ -587,7 +599,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # And also decrement number of visited roles. number_of_delegations -= 1 delegations = role_metadata.delegations - child_roles = delegations.get('roles', []) + child_roles = delegations.get("roles", []) if target is None: @@ -596,27 +608,35 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # delegated roles. for child_role in child_roles: child_role_name = _visit_child_role( - child_role, target_filepath) - - if (child_role['terminating'] and - child_role_name is not None): - logger.debug('Adding child role ' + - repr(child_role_name)) - logger.debug('Not backtracking to other roles.') + child_role, target_filepath + ) + + if ( + child_role["terminating"] + and child_role_name is not None + ): + logger.debug( + "Adding child role " + repr(child_role_name) + ) + logger.debug("Not backtracking to other roles.") role_names = [] child_roles_to_visit.append( - (child_role_name, role_name)) + (child_role_name, role_name) + ) break if child_role_name is None: - logger.debug('Skipping child role ' + - repr(child_role_name)) + logger.debug( + "Skipping child role " + repr(child_role_name) + ) else: - logger.debug('Adding child role ' + - repr(child_role_name)) + logger.debug( + "Adding child role " + repr(child_role_name) + ) child_roles_to_visit.append( - (child_role_name, role_name)) + (child_role_name, role_name) + ) # Push 'child_roles_to_visit' in reverse order of appearance # onto 'role_names'. Roles are popped from the end of @@ -625,21 +645,22 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: role_names.extend(child_roles_to_visit) else: - logger.debug('Found target in current role ' + - repr(role_name)) - - if (target is None and - number_of_delegations == 0 and - len(role_names) > 0): - logger.debug(repr(len(role_names)) + ' roles left to visit, ' + - 'but allowed to visit at most ' + - repr(tuf.settings.MAX_NUMBER_OF_DELEGATIONS) + ' delegations.') - - return {'filepath': target_filepath, 'fileinfo': target} - - + logger.debug("Found target in current role " + repr(role_name)) + if ( + target is None + and number_of_delegations == 0 + and len(role_names) > 0 + ): + logger.debug( + repr(len(role_names)) + + " roles left to visit, " + + "but allowed to visit at most " + + repr(tuf.settings.MAX_NUMBER_OF_DELEGATIONS) + + " delegations." + ) + return {"filepath": target_filepath, "fileinfo": target} def _visit_child_role(child_role: Dict, target_filepath: str) -> str: @@ -682,9 +703,9 @@ def _visit_child_role(child_role: Dict, target_filepath: str) -> str: Otherwise, we return None. """ - child_role_name = child_role['name'] - child_role_paths = child_role.get('paths') - child_role_path_hash_prefixes = child_role.get('path_hash_prefixes') + child_role_name = child_role["name"] + child_role_paths = child_role.get("paths") + child_role_path_hash_prefixes = child_role.get("path_hash_prefixes") if child_role_path_hash_prefixes is not None: target_filepath_hash = _get_target_hash(target_filepath) @@ -706,26 +727,35 @@ def _visit_child_role(child_role: Dict, target_filepath: str) -> str: # target without a leading path separator - make sure to strip any # leading path separators so that a match is made. # Example: "foo.tgz" should match with "/*.tgz". - if fnmatch.fnmatch(target_filepath.lstrip(os.sep), - child_role_path.lstrip(os.sep)): - logger.debug('Child role ' + repr(child_role_name) + - ' is allowed to sign for ' + repr(target_filepath)) + if fnmatch.fnmatch( + target_filepath.lstrip(os.sep), child_role_path.lstrip(os.sep) + ): + logger.debug( + "Child role " + + repr(child_role_name) + + " is allowed to sign for " + + repr(target_filepath) + ) return child_role_name logger.debug( - 'The given target path ' + repr(target_filepath) + - ' does not match the trusted path or glob pattern: ' + - repr(child_role_path)) + "The given target path " + + repr(target_filepath) + + " does not match the trusted path or glob pattern: " + + repr(child_role_path) + ) continue else: # 'role_name' should have been validated when it was downloaded. # The 'paths' or 'path_hash_prefixes' fields should not be missing, # so we raise a format error here in case they are both missing. - raise tuf.exceptions.FormatError(repr(child_role_name) + ' ' + raise tuf.exceptions.FormatError( + repr(child_role_name) + " " 'has neither a "paths" nor "path_hash_prefixes". At least' - ' one of these attributes must be present.') + " one of these attributes must be present." + ) return None @@ -740,8 +770,9 @@ def _check_file_length(file_object, trusted_file_length): # ensures that a downloaded file strictly matches a known, or trusted, # file length. if observed_length != trusted_file_length: - raise tuf.exceptions.DownloadLengthMismatchError(trusted_file_length, - observed_length) + raise tuf.exceptions.DownloadLengthMismatchError( + trusted_file_length, observed_length + ) def _check_hashes(file_object, trusted_hashes): @@ -759,26 +790,28 @@ def _check_hashes(file_object, trusted_hashes): # Raise an exception if any of the hashes are incorrect. if trusted_hash != computed_hash: - raise securesystemslib.exceptions.BadHashError(trusted_hash, - computed_hash) - - logger.info('The file\'s ' + algorithm + ' hash is' - ' correct: ' + trusted_hash) + raise securesystemslib.exceptions.BadHashError( + trusted_hash, computed_hash + ) + logger.info( + "The file's " + algorithm + " hash is" " correct: " + trusted_hash + ) -def _get_target_hash(target_filepath, hash_function='sha256'): +def _get_target_hash(target_filepath, hash_function="sha256"): # Calculate the hash of the filepath to determine which bin to find the # target. The client currently assumes the repository (i.e., repository # tool) uses 'hash_function' to generate hashes and UTF-8. digest_object = securesystemslib.hash.digest(hash_function) - encoded_target_filepath = target_filepath.encode('utf-8') + encoded_target_filepath = target_filepath.encode("utf-8") digest_object.update(encoded_target_filepath) target_filepath_hash = digest_object.hexdigest() return target_filepath_hash + def neither_403_nor_404(mirror_error): if isinstance(mirror_error, tuf.exceptions.FetcherHTTPError): if mirror_error.status_code in {403, 404}: From 2d48cf2d51c28be6ca35bb7c69f7180fe6b410e5 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 20:44:37 +0200 Subject: [PATCH 08/11] Configure tox to run black over the new client code Configure tox to run black and isort over the files under client_rework directory. Signed-off-by: Teodora Sechkova --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 8036eada7e..3d6c230dd1 100644 --- a/tox.ini +++ b/tox.ini @@ -43,7 +43,9 @@ commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) black --check --diff --line-length 80 {toxinidir}/tuf/api + black --check --diff --line-length 80 {toxinidir}/tuf/client_rework isort --check --diff --line-length 80 --profile black -p tuf {toxinidir}/tuf/api + isort --check --diff --line-length 80 --profile black -p tuf {toxinidir}/tuf/client_rework pylint {toxinidir}/tuf/api --rcfile={toxinidir}/tuf/api/pylintrc pylint {toxinidir}/tuf/client_rework --rcfile={toxinidir}/tuf/api/pylintrc From ca1ff90889d4ba38ef59fc4232f85b6f4984e41f Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 22:53:46 +0200 Subject: [PATCH 09/11] Fix various pylint issues Fix linter issues after applying the api/pylintrc config over the client_rework/* code. Signed-off-by: Teodora Sechkova --- tuf/client_rework/metadata_wrapper.py | 15 +-- tuf/client_rework/updater_rework.py | 152 ++++++++++++++++---------- 2 files changed, 102 insertions(+), 65 deletions(-) diff --git a/tuf/client_rework/metadata_wrapper.py b/tuf/client_rework/metadata_wrapper.py index 89b4a1da1e..6f182dc336 100644 --- a/tuf/client_rework/metadata_wrapper.py +++ b/tuf/client_rework/metadata_wrapper.py @@ -25,18 +25,19 @@ def __init__(self, meta): def from_json_object(cls, tmp_file): """Loads JSON-formatted TUF metadata from a file object.""" raw_data = tmp_file.read() - + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel from tuf.api.serialization.json import JSONDeserializer deserializer = JSONDeserializer() - _meta = deserializer.deserialize(raw_data) - return cls(meta=_meta) + meta = deserializer.deserialize(raw_data) + return cls(meta=meta) @classmethod def from_json_file(cls, filename): """Loads JSON-formatted TUF metadata from a file.""" - _meta = metadata.Metadata.from_file(filename) - return cls(meta=_meta) + meta = metadata.Metadata.from_file(filename) + return cls(meta=meta) @property def signed(self): @@ -97,7 +98,7 @@ def keys(self, role): keys = [] for keyid in self._meta.signed.roles[role]["keyids"]: key_metadata = self._meta.signed.keys[keyid] - key, _ = format_metadata_to_key(key_metadata) + key, dummy = format_metadata_to_key(key_metadata) keys.append(key) return keys @@ -162,7 +163,7 @@ def keys(self, role): if delegation["name"] == role: for keyid in delegation["keyids"]: key_metadata = self._meta.signed.delegations["keys"][keyid] - key, _ = format_metadata_to_key(key_metadata) + key, dummy = format_metadata_to_key(key_metadata) keys.append(key) return keys diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index 9e04c20475..2d9dfcf5b1 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -60,12 +60,7 @@ def __init__( self._repository_name = repository_name self._mirrors = repository_mirrors self._consistent_snapshot = False - self._metadata = { - "root": {}, - "timestamp": {}, - "snapshot": {}, - "targets": {}, - } + self._metadata = {} if fetcher is None: self._fetcher = RequestsFetcher() @@ -99,9 +94,8 @@ def get_one_valid_targetinfo(self, filename: str) -> Dict: """ return self._preorder_depth_first_walk(filename) - def updated_targets( - self, targets: Dict, destination_directory: str - ) -> Dict: + @staticmethod + def updated_targets(targets: Dict, destination_directory: str) -> Dict: """ After the client has retrieved the target information for those targets they are interested in updating, they would call this method to @@ -157,16 +151,18 @@ def download_target(self, target: Dict, destination_directory: str): The file is saved to the 'destination_directory' argument. """ - for temp_obj in self._mirror_target_download(target): - try: + try: + for temp_obj in self._mirror_target_download(target): self._verify_target_file(temp_obj, target) # break? should we break after first successful download? - except Exception as exception: - # TODO: do something with exceptions - raise - filepath = os.path.join(destination_directory, target["filepath"]) - securesystemslib.util.persist_temp_file(temp_obj, filepath) + filepath = os.path.join( + destination_directory, target["filepath"] + ) + securesystemslib.util.persist_temp_file(temp_obj, filepath) + except Exception: + # TODO: do something with exceptions + raise def _mirror_meta_download(self, filename: str, upper_length: int) -> TextIO: """ @@ -242,8 +238,9 @@ def _get_full_meta_name( filename, ) + @staticmethod def _get_relative_meta_name( - self, role: str, extension: str = ".json", version: int = None + role: str, extension: str = ".json", version: int = None ) -> str: """ Helper method returning full metadata file path given the role name @@ -269,7 +266,7 @@ def _load_root(self) -> None: # Update the root role # 1.1. Let N denote the version number of the trusted # root metadata file. - lower_bound = self._metadata["root"]._meta.signed.version + lower_bound = self._metadata["root"].version upper_bound = lower_bound + tuf.settings.MAX_NUMBER_ROOT_ROTATIONS verified_root = None @@ -284,7 +281,7 @@ def _load_root(self) -> None: try: verified_root = self._verify_root(temp_obj) - except Exception as exception: + except Exception: raise except tuf.exceptions.NoWorkingMirrorError as exception: @@ -299,7 +296,7 @@ def _load_root(self) -> None: # than the expiration timestamp in the trusted root metadata file try: verified_root.expires() - except Exception: + except tuf.exceptions.ExpiredMetadataError: temp_obj.close() # 1.9. If the timestamp and / or snapshot keys have been rotated, @@ -328,9 +325,13 @@ def _load_root(self) -> None: self._consistent_snapshot = self._metadata[ "root" ].signed.consistent_snapshot + temp_obj.close() def _load_timestamp(self) -> None: + """ + TODO + """ # TODO Check if timestamp exists locally for temp_obj in self._mirror_meta_download( "timestamp.json", tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH @@ -338,7 +339,7 @@ def _load_timestamp(self) -> None: try: verified_tampstamp = self._verify_timestamp(temp_obj) # break? should we break after first successful download? - except Exception as exception: + except Exception: # TODO: do something with exceptions temp_obj.close() raise @@ -353,16 +354,19 @@ def _load_timestamp(self) -> None: temp_obj.close() def _load_snapshot(self) -> None: - + """ + TODO + """ try: length = self._metadata["timestamp"].snapshot["length"] except KeyError: length = tuf.settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH - if self._consistent_snapshot: - version = self._metadata["timestamp"].snapshot["version"] - else: - version = None + # Uncomment when implementing consistent_snapshot + # if self._consistent_snapshot: + # version = self._metadata["timestamp"].snapshot["version"] + # else: + # version = None # Check if exists locally # self.loadLocal('snapshot', snapshotVerifier) @@ -370,7 +374,7 @@ def _load_snapshot(self) -> None: try: verified_snapshot = self._verify_snapshot(temp_obj) # break? should we break after first successful download? - except Exception as exception: + except Exception: # TODO: do something with exceptions temp_obj.close() raise @@ -385,15 +389,19 @@ def _load_snapshot(self) -> None: temp_obj.close() def _load_targets(self, targets_role: str, parent_role: str) -> None: + """ + TODO + """ try: length = self._metadata["snapshot"].role(targets_role)["length"] except KeyError: length = tuf.settings.DEFAULT_TARGETS_REQUIRED_LENGTH - if self._consistent_snapshot: - version = self._metadata["snapshot"].role(targets_role)["version"] - else: - version = None + # Uncomment when implementing consistent_snapshot + # if self._consistent_snapshot: + # version = self._metadata["snapshot"].role(targets_role)["version"] + # else: + # version = None # Check if exists locally # self.loadLocal('snapshot', targetsVerifier) @@ -406,7 +414,7 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None: temp_obj, targets_role, parent_role ) # break? should we break after first successful download? - except Exception as exception: + except Exception: # TODO: do something with exceptions temp_obj.close() raise @@ -420,6 +428,9 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None: temp_obj.close() def _verify_root(self, temp_obj: TextIO) -> RootWrapper: + """ + TODO + """ intermediate_root = RootWrapper.from_json_object(temp_obj) @@ -444,6 +455,9 @@ def _verify_root(self, temp_obj: TextIO) -> RootWrapper: return intermediate_root def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: + """ + TODO + """ intermediate_timestamp = TimestampWrapper.from_json_object(temp_obj) # Check for an arbitrary software attack @@ -453,7 +467,7 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: ) # Check for a rollback attack. - if self._metadata["timestamp"]: + if self._metadata.get("timestamp"): if ( intermediate_timestamp.signed.version <= self._metadata["timestamp"].version @@ -465,7 +479,7 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: self._metadata["timestamp"].version(), ) - if self._metadata["snapshot"]: + if self._metadata.get("snapshot"): if ( intermediate_timestamp.snapshot.version <= self._metadata["timestamp"].snapshot["version"] @@ -482,6 +496,9 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: return intermediate_timestamp def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: + """ + TODO + """ # Check against timestamp metadata if self._metadata["timestamp"].snapshot.get("hash"): @@ -505,7 +522,7 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: ) # Check for a rollback attack - if self._metadata["snapshot"]: + if self._metadata.get("snapshot"): for target_role in intermediate_snapshot.signed.meta: if ( target_role["version"] @@ -521,6 +538,9 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: def _verify_targets( self, temp_obj: TextIO, filename: str, parent_role: str ) -> TargetsWrapper: + """ + TODO + """ # Check against timestamp metadata if self._metadata["snapshot"].role(filename).get("hash"): @@ -547,12 +567,19 @@ def _verify_targets( return intermediate_targets - def _verify_target_file(self, temp_obj: BinaryIO, targetinfo: Dict) -> None: + @staticmethod + def _verify_target_file(temp_obj: BinaryIO, targetinfo: Dict) -> None: + """ + TODO + """ _check_file_length(temp_obj, targetinfo["fileinfo"]["length"]) _check_hashes(temp_obj, targetinfo["fileinfo"]["hashes"]) def _preorder_depth_first_walk(self, target_filepath) -> Dict: + """ + TODO + """ target = None role_names = [("targets", "root")] @@ -577,7 +604,8 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: self._load_targets(role_name, parent_role) # Skip any visited current role to prevent cycles. if (role_name, parent_role) in visited_role_names: - logger.debug(f"Skipping visited current role {role_name}") + msg = f"Skipping visited current role {role_name}" + logger.debug(msg) continue # The metadata for 'role_name' must be downloaded/updated before @@ -590,8 +618,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # refresh_all_delegated_roles=False) role_metadata = self._metadata[role_name] - targets = role_metadata.targets - target = targets.get(target_filepath) + target = role_metadata.targets.get(target_filepath) # After preorder check, add current role to set of visited roles. visited_role_names.add((role_name, parent_role)) @@ -615,10 +642,11 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: child_role["terminating"] and child_role_name is not None ): - logger.debug( - "Adding child role " + repr(child_role_name) + msg = ( + f"Adding child role {child_role_name}.\n", + "Not backtracking to other roles.", ) - logger.debug("Not backtracking to other roles.") + logger.debug(msg) role_names = [] child_roles_to_visit.append( (child_role_name, role_name) @@ -626,14 +654,12 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: break if child_role_name is None: - logger.debug( - "Skipping child role " + repr(child_role_name) - ) + msg = f"Skipping child role {child_role_name}" + logger.debug(msg) else: - logger.debug( - "Adding child role " + repr(child_role_name) - ) + msg = f"Adding child role {child_role_name}" + logger.debug(msg) child_roles_to_visit.append( (child_role_name, role_name) ) @@ -645,20 +671,21 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: role_names.extend(child_roles_to_visit) else: - logger.debug("Found target in current role " + repr(role_name)) + msg = f"Found target in current role {role_name}" + logger.debug(msg) if ( target is None and number_of_delegations == 0 and len(role_names) > 0 ): - logger.debug( - repr(len(role_names)) - + " roles left to visit, " - + "but allowed to visit at most " - + repr(tuf.settings.MAX_NUMBER_OF_DELEGATIONS) - + " delegations." + msg = ( + f"{len(role_names)} roles left to visit, ", + "but allowed to visit at most ", + f"{tuf.settings.MAX_NUMBER_OF_DELEGATIONS}", + " delegations.", ) + logger.debug(msg) return {"filepath": target_filepath, "fileinfo": target} @@ -761,7 +788,9 @@ def _visit_child_role(child_role: Dict, target_filepath: str) -> str: def _check_file_length(file_object, trusted_file_length): - + """ + TODO + """ file_object.seek(0, 2) observed_length = file_object.tell() @@ -776,7 +805,9 @@ def _check_file_length(file_object, trusted_file_length): def _check_hashes(file_object, trusted_hashes): - + """ + TODO + """ # Verify each trusted hash of 'trusted_hashes'. If all are valid, simply # return. for algorithm, trusted_hash in trusted_hashes.items(): @@ -800,7 +831,9 @@ def _check_hashes(file_object, trusted_hashes): def _get_target_hash(target_filepath, hash_function="sha256"): - + """ + TODO + """ # Calculate the hash of the filepath to determine which bin to find the # target. The client currently assumes the repository (i.e., repository # tool) uses 'hash_function' to generate hashes and UTF-8. @@ -813,6 +846,9 @@ def _get_target_hash(target_filepath, hash_function="sha256"): def neither_403_nor_404(mirror_error): + """ + TODO + """ if isinstance(mirror_error, tuf.exceptions.FetcherHTTPError): if mirror_error.status_code in {403, 404}: return False From df6c319c817222c8223a08aa0461aa8ca2108224 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 22:59:25 +0200 Subject: [PATCH 10/11] Disable exceptions related pylint checks Temporary disable (inline) try-except-raise and broad-except warnings in the new Updater code until client exception handling is revised (#1312). Signed-off-by: Teodora Sechkova --- tuf/client_rework/updater_rework.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index 2d9dfcf5b1..9900e6ffef 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -160,6 +160,7 @@ def download_target(self, target: Dict, destination_directory: str): destination_directory, target["filepath"] ) securesystemslib.util.persist_temp_file(temp_obj, filepath) + # pylint: disable=try-except-raise except Exception: # TODO: do something with exceptions raise @@ -182,6 +183,7 @@ def _mirror_meta_download(self, filename: str, upper_length: int) -> TextIO: temp_obj.seek(0) yield temp_obj + # pylint: disable=broad-except except Exception as exception: file_mirror_errors[file_mirror] = exception @@ -209,7 +211,7 @@ def _mirror_target_download(self, fileinfo: str) -> BinaryIO: temp_obj.seek(0) yield temp_obj - + # pylint: disable=broad-except except Exception as exception: file_mirror_errors[file_mirror] = exception @@ -280,7 +282,7 @@ def _load_root(self) -> None: for temp_obj in mirror_download: try: verified_root = self._verify_root(temp_obj) - + # pylint: disable=try-except-raise except Exception: raise From d472989011a131397fd00f55f983ec8ab1ef3175 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Mar 2021 23:13:53 +0200 Subject: [PATCH 11/11] Disable undefined-loop-variable checks Temporary disable (inline) undefined-loop-variable pylint checks in the new Updater code until the download functionality is revised (#1307). Signed-off-by: Teodora Sechkova --- tuf/client_rework/updater_rework.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index 9900e6ffef..10fdcc415f 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -299,7 +299,7 @@ def _load_root(self) -> None: try: verified_root.expires() except tuf.exceptions.ExpiredMetadataError: - temp_obj.close() + temp_obj.close() # pylint: disable=undefined-loop-variable # 1.9. If the timestamp and / or snapshot keys have been rotated, # then delete the trusted timestamp and snapshot metadata files. @@ -328,7 +328,7 @@ def _load_root(self) -> None: "root" ].signed.consistent_snapshot - temp_obj.close() + temp_obj.close() # pylint: disable=undefined-loop-variable def _load_timestamp(self) -> None: """ @@ -353,7 +353,7 @@ def _load_timestamp(self) -> None: self._get_full_meta_name("timestamp.json") ) - temp_obj.close() + temp_obj.close() # pylint: disable=undefined-loop-variable def _load_snapshot(self) -> None: """ @@ -388,7 +388,7 @@ def _load_snapshot(self) -> None: self._get_full_meta_name("snapshot.json") ) - temp_obj.close() + temp_obj.close() # pylint: disable=undefined-loop-variable def _load_targets(self, targets_role: str, parent_role: str) -> None: """ @@ -427,7 +427,7 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None: self._get_full_meta_name(targets_role, extension=".json") ) - temp_obj.close() + temp_obj.close() # pylint: disable=undefined-loop-variable def _verify_root(self, temp_obj: TextIO) -> RootWrapper: """