Skip to content

ngclient: use MetaFile/TargetFile verification #1467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
from datetime import datetime
from typing import Dict, Iterator, Optional

from securesystemslib import hash as sslib_hash

from tuf import exceptions
from tuf.api.metadata import Metadata, Root, Targets
from tuf.api.serialization import DeserializationError
Expand Down Expand Up @@ -304,8 +302,7 @@ def update_timestamp(self, data: bytes):
self._trusted_set["timestamp"] = new_timestamp
logger.debug("Updated timestamp")

# TODO: remove pylint disable once the hash verification is in metadata.py
def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches
def update_snapshot(self, data: bytes):
"""Verifies and loads 'data' as new snapshot metadata.

Args:
Expand All @@ -325,13 +322,12 @@ def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches
meta = self.timestamp.signed.meta["snapshot.json"]

# Verify against the hashes in timestamp, if any
hashes = meta.hashes or {}
for algo, stored_hash in hashes.items():
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
observed_hash = digest_object.hexdigest()
if observed_hash != stored_hash:
raise exceptions.BadHashError(stored_hash, observed_hash)
try:
meta.verify_length_and_hashes(data)
except exceptions.LengthOrHashMismatchError as e:
raise exceptions.RepositoryError(
"Snapshot length or hashes do not match"
) from e

try:
new_snapshot = Metadata.from_bytes(data)
Expand Down Expand Up @@ -425,14 +421,12 @@ def update_delegated_targets(
f"Snapshot does not contain information for '{role_name}'"
)

hashes = meta.hashes or {}
for algo, stored_hash in hashes.items():
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
observed_hash = digest_object.hexdigest()
if observed_hash != stored_hash:
# TODO: Error should derive from RepositoryError
raise exceptions.BadHashError(stored_hash, observed_hash)
try:
meta.verify_length_and_hashes(data)
except exceptions.LengthOrHashMismatchError as e:
raise exceptions.RepositoryError(
f"{role_name} length or hashes do not match"
) from e

try:
new_delegate = Metadata.from_bytes(data)
Expand Down
91 changes: 21 additions & 70 deletions tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Any, Dict, List, Optional
from urllib import parse

from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib import util as sslib_util

Expand Down Expand Up @@ -144,32 +143,21 @@ def updated_targets(
# 'destination_directory' if 'filepath' contains a leading path
# separator (i.e., is treated as an absolute path).
filepath = target["filepath"]
target_fileinfo: "TargetFile" = target["fileinfo"]

target_filepath = os.path.join(destination_directory, filepath)

if target_filepath in updated_targetpaths:
continue

# Try one of the algorithm/digest combos for a mismatch. We break
# as soon as we find a mismatch.
for algorithm, digest in target["fileinfo"].hashes.items():
digest_object = None
try:
digest_object = sslib_hash.digest_filename(
target_filepath, algorithm=algorithm
)

# This exception will occur if the target does not exist
# locally.
except sslib_exceptions.StorageError:
updated_targets.append(target)
updated_targetpaths.append(target_filepath)
break

# The file does exist locally, check if its hash differs.
if digest_object.hexdigest() != digest:
updated_targets.append(target)
updated_targetpaths.append(target_filepath)
break
try:
with open(target_filepath, "rb") as target_file:
target_fileinfo.verify_length_and_hashes(target_file)
# If the file does not exist locally or length and hashes
# do not match, append to updated targets.
except (OSError, exceptions.LengthOrHashMismatchError):
updated_targets.append(target)
updated_targetpaths.append(target_filepath)

return updated_targets

Expand Down Expand Up @@ -205,17 +193,21 @@ def download_target(
else:
target_base_url = _ensure_trailing_slash(target_base_url)

full_url = parse.urljoin(target_base_url, targetinfo["filepath"])
target_filepath = targetinfo["filepath"]
target_fileinfo: "TargetFile" = targetinfo["fileinfo"]
full_url = parse.urljoin(target_base_url, target_filepath)

with download.download_file(
full_url, targetinfo["fileinfo"].length, self._fetcher
full_url, target_fileinfo.length, self._fetcher
) as target_file:
_check_file_length(target_file, targetinfo["fileinfo"].length)
_check_hashes_obj(target_file, targetinfo["fileinfo"].hashes)
try:
target_fileinfo.verify_length_and_hashes(target_file)
except exceptions.LengthOrHashMismatchError as e:
raise exceptions.RepositoryError(
f"{target_filepath} length or hashes do not match"
) from e

filepath = os.path.join(
destination_directory, targetinfo["filepath"]
)
filepath = os.path.join(destination_directory, target_filepath)
sslib_util.persist_temp_file(target_file, filepath)

def _download_metadata(
Expand Down Expand Up @@ -521,47 +513,6 @@ def _visit_child_role(child_role: Dict, target_filepath: str) -> str:
return None


def _check_file_length(file_object, trusted_file_length):
"""
Given a file_object, checks whether its length matches
trusted_file_length.

Raises:
DownloadLengthMismatchError: File length does not match
expected length.
"""
file_object.seek(0, 2)
observed_length = file_object.tell()
file_object.seek(0)

if observed_length != trusted_file_length:
raise exceptions.DownloadLengthMismatchError(
trusted_file_length, observed_length
)


def _check_hashes_obj(file_object, trusted_hashes):
"""
Given a file_object, checks whether its hash matches
trusted_hashes.

Raises:
BadHashError: Hashes do not match
"""
for algorithm, trusted_hash in trusted_hashes.items():
digest_object = sslib_hash.digest_fileobject(file_object, algorithm)

computed_hash = digest_object.hexdigest()

# Raise an exception if any of the hashes are incorrect.
if trusted_hash != computed_hash:
raise exceptions.BadHashError(trusted_hash, computed_hash)

logger.info(
"The file's " + algorithm + " hash is" " correct: " + trusted_hash
)


def _get_filepath_hash(target_filepath, hash_function="sha256"):
"""
Calculate the hash of the filepath to determine which bin to find the
Expand Down