From 17f08ad200cb7445e5d17c15c0982ee9236df02b Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Aug 2020 11:15:49 +0200 Subject: [PATCH 01/16] Add simple TUF role metadata model (WIP) Add metadata module with container classes for TUF role metadata, including methods to read/serialize/write from and to JSON, perform TUF-compliant metadata updates, and create and verify signatures. The 'Metadata' class provides a container for inner TUF metadata objects (Root, Timestamp, Snapshot, Targets) (i.e. OOP composition) The 'Signed' class provides a base class to aggregate common attributes (i.e. version, expires, spec_version) of the inner metadata classes. (i.e. OOP inheritance). The name of the class also aligns with the 'signed' field of the outer metadata container. Based on prior observations in TUF's sister project in-toto, this architecture seems to well represent the metadata model as it is defined in the specification (see in-toto/in-toto#98 and in-toto/in-toto#142 for related discussions). This commits also adds tests. **TODO: See doc header TODO list** **Additional design considerations** (also in regards to prior sketches of this module) - Aims at simplicity, brevity and recognizability of the wireline metadata format. - All attributes that correspond to fields in TUF JSON metadata are public. There doesn't seem to be a good reason to protect them with leading underscores and use setters/getters instead, it just adds more code, and impedes recognizability of the wireline metadata format. - Although, it might be convenient to have short-cuts on the Metadata class that point to methods and attributes that are common to all subclasses of the contained Signed class (e.g. Metadata.version instead of Metadata.signed.version, etc.), this also conflicts with goal of recognizability of the wireline metadata. Thus we won't add such short-cuts for now. See: https://github.com/theupdateframework/tuf/pull/1060#discussion_r452906629 - Signing keys and a 'consistent_snapshot' boolean are not on the targets metadata class. They are a better fit for management code. See: https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376, and #660. - Does not use sslib schema checks (see TODO notes about validation in doc header) - Does not use existing tuf utils, such as make_metadata_fileinfo, build_dict_conforming_to_schema, if it is easy and more explicit to just re-implement the desired behavior on the metadata classes. - All datetime's are treated as UTC. Since timezone info is not captured in the wireline metadata format it should not be captured in the internal representation either. - Does not use 3rd-party dateutil package, in order to minimize dependency footprint, which is especially important for update clients which often have to vendor their dependencies. However, compatibility between the more advanced dateutil.relativedelta (e.g handles leap years automatically) and timedelta is tested. - Uses PEP8 indentation (4 space) and Google-style doc string instead of sslab-style. See https://github.com/secure-systems-lab/code-style-guidelines/issues/20 - Does not support Python =< 3.5 Co-authored-by: Trishank Karthik Kuppusamy Co-authored-by: Joshua Lock Co-authored-by: Teodora Sechkova Signed-off-by: Lukas Puehringer --- tests/test_api.py | 165 ++++++++++++++++++++++++++ tuf/api/metadata.py | 277 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 tests/test_api.py create mode 100644 tuf/api/metadata.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000000..42db352404 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python + +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 +""" Unit tests for api/metdata.py +""" +import logging +import os +import shutil +import tempfile +import unittest + +from datetime import timedelta +from dateutil.relativedelta import relativedelta + +from tuf.api.metadata import ( + Snapshot, + Timestamp, +) + +logger = logging.getLogger(__name__) + + +class TestMetadata(unittest.TestCase): + # TODO: Start Vault in a dev mode, and export VAULT_ADDR as well as VAULT_TOKEN. + # TODO: Enable the Vault Transit secrets engine. + @classmethod + def setUpClass(cls): + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + test_repo_data = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'repository_data') + + cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + shutil.copytree( + os.path.join(test_repo_data, 'repository'), cls.repo_dir) + + cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + shutil.copytree( + os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + + + # TODO: Shut down Vault. + @classmethod + def tearDownClass(cls): + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + # def _load_key_ring(self): + # key_list = [] + # root_key = RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), + # 'rsassa-pss-sha256', 'password') + # key_list.append(root_key) + + # for key_file in os.listdir(self.keystore_dir): + # if key_file.endswith('.pub'): + # # ignore public keys + # continue + + # if key_file.startswith('root_key'): + # # root key is loaded + # continue + + # key = RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file), + # 'ed25519', 'password') + # key_list.append(key) + + # threshold = Threshold(1, 5) + # return KeyRing(threshold=threshold, keys=key_list) + + def test_metadata_base(self): + # Use of Snapshot is arbitrary, we're just testing the base class features + # with real data + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + md = Snapshot.read_from_json(snapshot_path) + + self.assertEqual(md.signed.version, 1) + md.signed.bump_version() + self.assertEqual(md.signed.version, 2) + self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z') + md.signed.bump_expiration() + self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z') + md.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z') + + + def test_metadata_snapshot(self): + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Snapshot.read_from_json(snapshot_path) + + # key_ring = self._load_key_ring() + # snapshot.verify(key_ring) + + # Create a dict representing what we expect the updated data to be + fileinfo = snapshot.signed.meta + hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + fileinfo['role1.json']['version'] = 2 + fileinfo['role1.json']['hashes'] = hashes + fileinfo['role1.json']['length'] = 123 + + snapshot.signed.update('role1', 2, 123, hashes) + self.assertEqual(snapshot.signed.meta, fileinfo) + + # snapshot.signable() + + # snapshot.sign() + + # snapshot.verify() + + # snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json')) + + + def test_metadata_timestamp(self): + timestamp_path = os.path.join( + self.repo_dir, 'metadata', 'timestamp.json') + timestamp = Timestamp.read_from_json(timestamp_path) + + # key_ring = self._load_key_ring() + # timestamp.verify(key_ring) + + self.assertEqual(timestamp.signed.version, 1) + timestamp.signed.bump_version() + self.assertEqual(timestamp.signed.version, 2) + + self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z') + timestamp.signed.bump_expiration() + self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z') + timestamp.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z') + + # Test whether dateutil.relativedelta works, this provides a much + # easier to use interface for callers + delta = relativedelta(days=1) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z') + delta = relativedelta(years=5) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z') + + hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + fileinfo = timestamp.signed.meta['snapshot.json'] + fileinfo['hashes'] = hashes + fileinfo['version'] = 2 + fileinfo['length'] = 520 + timestamp.signed.update(2, 520, hashes) + self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) + + # timestamp.sign() + + # timestamp.write_to_json() + + +# Run unit test. +if __name__ == '__main__': + unittest.main() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py new file mode 100644 index 0000000000..dddb47af06 --- /dev/null +++ b/tuf/api/metadata.py @@ -0,0 +1,277 @@ +"""TUF role metadata model. + +This module provides container classes for TUF role metadata, including methods +to read/serialize/write from and to JSON, perform TUF-compliant metadata +updates, and create and verify signatures. + +TODO: + + * Add docstrings + + * Finalize/Document Verify/Sign functions (I am not fully sure about expected + behavior). See + https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376 + + * Validation (some thoughts ...) + - Avoid schema, see secure-systems-lab/securesystemslib#183 + - Provide methods to validate JSON representation (at user boundary) + - Fail on bad json metadata in read_from_json method + - Be lenient on bad/invalid metadata objects in memory, they might be + work in progress. E.g. it might be convenient to create empty metadata + and assign attributes later on. + - Fail on bad json metadata in write_to_json method, but with option to + disable check as there might be a justified reason to write WIP + metadata to json. + + * It might be nice to have a generic Metadata.read_from_json that + can load any TUF role metadata and instantiate the appropriate object based + on the json '_type' field. + + * Add Root metadata class + +""" +# Imports + +from datetime import datetime, timedelta +from typing import Any, Dict, Optional + +import json +import logging +import tempfile + +from securesystemslib.formats import encode_canonical +from securesystemslib.util import load_json_file, persist_temp_file +from securesystemslib.storage import StorageBackendInterface +from tuf.repository_lib import ( + _get_written_metadata, + _strip_version_number +) + +import iso8601 +import tuf.formats + + +# Types + +JsonDict = Dict[str, Any] + + +# Classes. + +class Metadata(): + def __init__( + self, signed: 'Signed' = None, signatures: list = None) -> None: + # TODO: How much init magic do we want? + self.signed = signed + self.signatures = signatures + + def as_dict(self) -> JsonDict: + return { + 'signatures': self.signatures, + 'signed': self.signed.as_dict() + } + + # def __update_signature(self, signatures, keyid, signature): + # updated = False + # keyid_signature = {'keyid':keyid, 'sig':signature} + # for idx, keyid_sig in enumerate(signatures): + # if keyid_sig['keyid'] == keyid: + # signatures[idx] = keyid_signature + # updated = True + # if not updated: + # signatures.append(keyid_signature) + + # def sign(self, key_ring: ???) -> JsonDict: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed_bytes + # signatures = self.__signatures + + # for key in key_ring.keys: + # signature = key.sign(self.signed_bytes) + # self.__update_signature(signatures, key.keyid, signature) + + # self.__signatures = signatures + # return self.signable + + # def verify(self, key_ring: ???) -> bool: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed.signed_bytes + # signatures = self.signatures + # verified_keyids = set() + + # for signature in signatures: + # # TODO: handle an empty keyring + # for key in key_ring.keys: + # keyid = key.keyid + # if keyid == signature['keyid']: + # try: + # verified = key.verify(signed_bytes, signature) + # except: + # logging.exception(f'Could not verify signature for key {keyid}') + # continue + # else: + # # Avoid https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-6174 + # verified_keyids.add(keyid) + + # break + + # return len(verified_keyids) >= key_ring.threshold.least + + def write_to_json( + self, filename: str, + storage_backend: StorageBackendInterface = None) -> None: + with tempfile.TemporaryFile() as f: + f.write(_get_written_metadata(self.sign()).encode_canonical()) + persist_temp_file(f, filename, storage_backend) + + +class Signed: + # NOTE: Signed is a stupid name, because this might not be signed yet, but + # we keep it to match spec terminology (I often refer to this as "payload", + # or "inner metadata") + + # TODO: Re-think default values. It might be better to pass some things + # as args and not es kwargs. Then we'd need to pop those from + # signable["signed"] in read_from_json and pass them explicitly, which + # some say is better than implicit. :) + def __init__( + self, _type: str = None, version: int = 0, + spec_version: str = None, expires: datetime = None + ) -> None: + # TODO: How much init magic do we want? + + self._type = _type + self.spec_version = spec_version + + # We always intend times to be UTC + # NOTE: we could do this with datetime.fromisoformat() but that is not + # available in Python 2.7's datetime + # NOTE: Store as datetime object for convenient handling, use 'expires' + # property to get the TUF metadata format representation + self.__expiration = iso8601.parse_date(expires).replace(tzinfo=None) + + if version < 0: + raise ValueError(f'version must be < 0, got {version}') + self.version = version + + @property + def signed_bytes(self) -> bytes: + return encode_canonical(self.as_dict()).encode('UTF-8') + + @property + def expires(self) -> str: + """The expiration property in TUF metadata format.""" + return self.__expiration.isoformat() + 'Z' + + def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + self.__expiration = self.__expiration + delta + + def bump_version(self) -> None: + self.version += 1 + + def as_dict(self) -> JsonDict: + # NOTE: The classes should be the single source of truth about metadata + # let's define the dict representation here and not in some dubious + # build_dict_conforming_to_schema + return { + '_type': self._type, + 'version': self.version, + 'spec_version': self.spec_version, + 'expires': self.expires + } + + @classmethod + def read_from_json( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> Metadata: + signable = load_json_file(filename, storage_backend) + + # FIXME: It feels dirty to access signable["signed"]["version"] here in + # order to do this check, and also a bit random (there are likely other + # things to check), but later we don't have the filename anymore. If we + # want to stick to the check, which seems reasonable, we should maybe + # think of a better place. + _, fn_prefix = _strip_version_number(filename, True) + if fn_prefix and fn_prefix != signable['signed']['version']: + raise ValueError( + f'version filename prefix ({fn_prefix}) must align with ' + f'version in metadata ({signable["signed"]["version"]}).') + + return Metadata( + signed=cls(**signable['signed']), + signatures=signable['signatures']) + + +class Timestamp(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + super().__init__(**kwargs) + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Update metadata about the snapshot metadata. + def update(self, version: int, length: int, hashes: JsonDict) -> None: + fileinfo = self.meta.get('snapshot.json', {}) + fileinfo['version'] = version + fileinfo['length'] = length + fileinfo['hashes'] = hashes + self.meta['snapshot.json'] = fileinfo + + +class Snapshot(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Add or update metadata about the targets metadata. + def update( + self, rolename: str, version: int, length: Optional[int] = None, + hashes: Optional[JsonDict] = None) -> None: + metadata_fn = f'{rolename}.json' + + self.meta[metadata_fn] = {'version': version} + if length is not None: + self.meta[metadata_fn]['length'] = length + + if hashes is not None: + self.meta[metadata_fn]['hashes'] = hashes + + +class Targets(Signed): + def __init__( + self, targets: JsonDict = None, delegations: JsonDict = None, + **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.targets = targets + self.delegations = delegations + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'targets': self.targets, + 'delegations': self.delegations, + }) + return json_dict + + # Add or update metadata about the target. + def update(self, filename: str, fileinfo: JsonDict) -> None: + self.targets[filename] = fileinfo From b1dd3d6787a136ad2dfd5876c94bb9badaa3d912 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Aug 2020 15:55:43 +0200 Subject: [PATCH 02/16] Skip api tests on Python < 3.6 The new metadata module uses constructs that are only available on Python >= 3.6 (typing, f-format strings, etc.). Signed-off-by: Lukas Puehringer --- tests/test_api.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 42db352404..3fdac39b51 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,7 +3,10 @@ # Copyright 2020, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 """ Unit tests for api/metdata.py + """ + +import sys import logging import os import shutil @@ -13,10 +16,21 @@ from datetime import timedelta from dateutil.relativedelta import relativedelta -from tuf.api.metadata import ( - Snapshot, - Timestamp, -) +# TODO: Remove case handling when fully dropping support for versions >= 3.6 +IS_PY_VERSION_SUPPORTED = sys.version_info >= (3, 6) + +# Use setUpModule to tell unittest runner to skip this test module gracefully. +def setUpModule(): + if not IS_PY_VERSION_SUPPORTED: + raise unittest.SkipTest("requires Python 3.6 or higher") + +# Since setUpModule is called after imports we need to import conditionally. +if IS_PY_VERSION_SUPPORTED: + from tuf.api.metadata import ( + Snapshot, + Timestamp, + ) + logger = logging.getLogger(__name__) From e997097d1cd51b3f596a1adee5b3bdb84bbbbb8a Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Aug 2020 17:04:55 +0200 Subject: [PATCH 03/16] Add generic Metadata.read_from_json class method Add generic read from json class method that returns a Metadata object with a signed field that contains the appropriate Signed subclass, based on the signed._type field of the read metadata. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 34 ++++++++++++++++++++++++++++++++ tuf/api/metadata.py | 48 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 3fdac39b51..97580c600e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -6,6 +6,7 @@ """ +import json import sys import logging import os @@ -27,8 +28,10 @@ def setUpModule(): # Since setUpModule is called after imports we need to import conditionally. if IS_PY_VERSION_SUPPORTED: from tuf.api.metadata import ( + Metadata, Snapshot, Timestamp, + Targets ) @@ -90,6 +93,37 @@ def tearDownClass(cls): # threshold = Threshold(1, 5) # return KeyRing(threshold=threshold, keys=key_list) + def test_generic_read(self): + for metadata, inner_metadata_cls in [ + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets)]: + + path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + metadata_obj = Metadata.read_from_json(path) + + # Assert that generic method ... + # ... instantiates the right inner class for each metadata type + self.assertTrue( + isinstance(metadata_obj.signed, inner_metadata_cls)) + # ... and reads the same metadata file as the corresponding method + # on the inner class would do (compare their dict representation) + self.assertDictEqual( + metadata_obj.as_dict(), + inner_metadata_cls.read_from_json(path).as_dict()) + + # Assert that it chokes correctly on an unknown metadata type + bad_metadata_path = 'bad-metadata.json' + bad_metadata = {'signed': {'_type': 'bad-metadata'}} + with open(bad_metadata_path, 'wb') as f: + f.write(json.dumps(bad_metadata).encode('utf-8')) + + with self.assertRaises(ValueError): + Metadata.read_from_json(bad_metadata_path) + + os.remove(bad_metadata_path) + + def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features # with real data diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index dddb47af06..c75e024609 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -23,10 +23,6 @@ disable check as there might be a justified reason to write WIP metadata to json. - * It might be nice to have a generic Metadata.read_from_json that - can load any TUF role metadata and instantiate the appropriate object based - on the json '_type' field. - * Add Root metadata class """ @@ -116,6 +112,50 @@ def as_dict(self) -> JsonDict: # break # return len(verified_keyids) >= key_ring.threshold.least + @classmethod + def read_from_json( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads JSON-formatted TUF metadata from a file storage. + + Arguments: + filename: The path to read the file from. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + + Raises: + securesystemslib.exceptions.StorageError: The file cannot be read. + securesystemslib.exceptions.Error, ValueError: The metadata cannot + be parsed. + + Returns: + A TUF Metadata object. + + """ + signable = load_json_file(filename, storage_backend) + + # TODO: Should we use constants? + # And/or maybe a dispatch table? (<-- maybe too much magic) + _type = signable['signed']['_type'] + + if _type == 'targets': + inner_cls = Targets + elif _type == 'snapshot': + inner_cls = Snapshot + elif _type == 'timestamp': + inner_cls = Timestamp + elif _type == 'root': + # TODO: implement Root class + raise NotImplementedError('Root not yet implemented') + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + return Metadata( + signed=inner_cls(**signable['signed']), + signatures=signable['signatures']) + def write_to_json( self, filename: str, From 088e94055f8c019ee89eb0023b98762eab12b3aa Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 19 Aug 2020 10:40:58 +0200 Subject: [PATCH 04/16] Replace _get_written_metadata with as_json method. Add simple as_json Metadata method and use it instead of repository lib's internal _get_written_metadata function in write_to_json. This commit further adds code documentation and the possibility to write compact json by excluding whitespace to write_to_json, and also removes a call to the sign method from write_to_json. The commit also adds tests. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 23 +++++++++++++++++++++++ tuf/api/metadata.py | 30 +++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 97580c600e..327f79f73c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -123,6 +123,29 @@ def test_generic_read(self): os.remove(bad_metadata_path) + def test_compact_json(self): + path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + metadata_obj = Metadata.read_from_json(path) + self.assertTrue( + len(metadata_obj.as_json(compact=True)) < + len(metadata_obj.as_json())) + + + def test_read_write_read_compare(self): + for metadata in ["snapshot", "timestamp", "targets"]: + path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + metadata_obj = Metadata.read_from_json(path) + + path_2 = path + '.tmp' + metadata_obj.write_to_json(path_2) + metadata_obj_2 = Metadata.read_from_json(path_2) + + self.assertDictEqual( + metadata_obj.as_dict(), + metadata_obj_2.as_dict()) + + os.remove(path_2) + def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index c75e024609..b6d2b6e939 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -39,7 +39,6 @@ from securesystemslib.util import load_json_file, persist_temp_file from securesystemslib.storage import StorageBackendInterface from tuf.repository_lib import ( - _get_written_metadata, _strip_version_number ) @@ -67,6 +66,14 @@ def as_dict(self) -> JsonDict: 'signed': self.signed.as_dict() } + def as_json(self, compact: bool = False) -> None: + """Returns the optionally compacted JSON representation. """ + return json.dumps( + self.as_dict(), + indent=(None if compact else 1), + separators=((',', ':') if compact else (',', ': ')), + sort_keys=True) + # def __update_signature(self, signatures, keyid, signature): # updated = False # keyid_signature = {'keyid':keyid, 'sig':signature} @@ -156,12 +163,25 @@ def read_from_json( signed=inner_cls(**signable['signed']), signatures=signable['signatures']) - def write_to_json( - self, filename: str, + self, filename: str, compact: bool = False, storage_backend: StorageBackendInterface = None) -> None: - with tempfile.TemporaryFile() as f: - f.write(_get_written_metadata(self.sign()).encode_canonical()) + """Writes the JSON representation of the instance to file storage. + + Arguments: + filename: The path to write the file to. + compact: A boolean indicating if the JSON string should be compact + by excluding whitespace. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + Raises: + securesystemslib.exceptions.StorageError: + The file cannot be written. + + """ + with tempfile.TemporaryFile() as f: + f.write(self.as_json(compact).encode('utf-8')) persist_temp_file(f, filename, storage_backend) From 0d7e2680f2743799256f7fc03d54678daf73e118 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 20 Aug 2020 12:19:36 +0200 Subject: [PATCH 05/16] Simplifies Timestamp.update method Signed-off-by: Lukas Puehringer --- tuf/api/metadata.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index b6d2b6e939..371488b2ec 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -279,11 +279,11 @@ def as_dict(self) -> JsonDict: # Update metadata about the snapshot metadata. def update(self, version: int, length: int, hashes: JsonDict) -> None: - fileinfo = self.meta.get('snapshot.json', {}) - fileinfo['version'] = version - fileinfo['length'] = length - fileinfo['hashes'] = hashes - self.meta['snapshot.json'] = fileinfo + self.meta['snapshot.json'] = { + 'version': version, + 'length': length, + 'hashes': hashes + } class Snapshot(Signed): From 5cc73353fa1550fec9c372ffed8e8c72440dc166 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 20 Aug 2020 12:19:55 +0200 Subject: [PATCH 06/16] Add metadata model class and method docstrings Signed-off-by: Lukas Puehringer --- tests/test_api.py | 2 +- tuf/api/metadata.py | 172 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 163 insertions(+), 11 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 327f79f73c..65d4b4c09b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,7 +2,7 @@ # Copyright 2020, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 -""" Unit tests for api/metdata.py +""" Unit tests for api/metadata.py """ diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 371488b2ec..064ba5395b 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -6,8 +6,6 @@ TODO: - * Add docstrings - * Finalize/Document Verify/Sign functions (I am not fully sure about expected behavior). See https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376 @@ -54,6 +52,27 @@ # Classes. class Metadata(): + """A container for signed TUF metadata. + + Provides methods to (de-)serialize JSON metadata from and to file + storage, and to create and verify signatures. + + Attributes: + signed: A subclass of Signed, which has the actual metadata payload, + i.e. one of Targets, Snapshot, Timestamp or Root. + + signatures: A list of signatures over the canonical JSON representation + of the value of the signed attribute:: + + [ + { + 'keyid': '', + 'sig':' '' + }, + ... + ] + + """ def __init__( self, signed: 'Signed' = None, signatures: list = None) -> None: # TODO: How much init magic do we want? @@ -61,13 +80,14 @@ def __init__( self.signatures = signatures def as_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ return { 'signatures': self.signatures, 'signed': self.signed.as_dict() } def as_json(self, compact: bool = False) -> None: - """Returns the optionally compacted JSON representation. """ + """Returns the optionally compacted JSON representation of self. """ return json.dumps( self.as_dict(), indent=(None if compact else 1), @@ -124,7 +144,7 @@ def read_from_json( cls, filename: str, storage_backend: Optional[StorageBackendInterface] = None ) -> 'Metadata': - """Loads JSON-formatted TUF metadata from a file storage. + """Loads JSON-formatted TUF metadata from file storage. Arguments: filename: The path to read the file from. @@ -166,7 +186,7 @@ def read_from_json( def write_to_json( self, filename: str, compact: bool = False, storage_backend: StorageBackendInterface = None) -> None: - """Writes the JSON representation of the instance to file storage. + """Writes the JSON representation of self to file storage. Arguments: filename: The path to write the file to. @@ -186,6 +206,21 @@ def write_to_json( class Signed: + """A base class for the signed part of TUF metadata. + + Objects with base class Signed are usually included in a Metadata object + on the signed attribute. This class provides attributes and methods that + are common for all TUF metadata types (roles). + + Attributes: + _type: The metadata type string. + version: The metadata version number. + spec_version: The TUF specification version number (semver) the + metadata format adheres to. + expires: The metadata expiration datetime object. + signed_bytes: The UTF-8 encoded canonical JSON representation of self. + + """ # NOTE: Signed is a stupid name, because this might not be signed yet, but # we keep it to match spec terminology (I often refer to this as "payload", # or "inner metadata") @@ -220,19 +255,18 @@ def signed_bytes(self) -> bytes: @property def expires(self) -> str: - """The expiration property in TUF metadata format.""" return self.__expiration.isoformat() + 'Z' def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + """Increments the expires attribute by the passed timedelta. """ self.__expiration = self.__expiration + delta def bump_version(self) -> None: + """Increments the metadata version number by 1.""" self.version += 1 def as_dict(self) -> JsonDict: - # NOTE: The classes should be the single source of truth about metadata - # let's define the dict representation here and not in some dubious - # build_dict_conforming_to_schema + """Returns the JSON-serializable dictionary representation of self. """ return { '_type': self._type, 'version': self.version, @@ -246,7 +280,24 @@ def read_from_json( storage_backend: Optional[StorageBackendInterface] = None ) -> Metadata: signable = load_json_file(filename, storage_backend) + """Loads corresponding JSON-formatted metadata from file storage. + Arguments: + filename: The path to read the file from. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + + Raises: + securesystemslib.exceptions.StorageError: The file cannot be read. + securesystemslib.exceptions.Error, ValueError: The metadata cannot + be parsed. + + Returns: + A TUF Metadata object whose signed attribute contains an object + of this class. + + """ # FIXME: It feels dirty to access signable["signed"]["version"] here in # order to do this check, and also a bit random (there are likely other # things to check), but later we don't have the filename anymore. If we @@ -264,6 +315,24 @@ def read_from_json( class Timestamp(Signed): + """A container for the signed part of timestamp metadata. + + Attributes: + meta: A dictionary that contains information about snapshot metadata:: + + { + 'snapshot.json': { + 'version': , + 'length': , // optional + 'hashes': { + '': '', + '': '', + ... + } + } + } + + """ def __init__(self, meta: JsonDict = None, **kwargs) -> None: super().__init__(**kwargs) # TODO: How much init magic do we want? @@ -271,14 +340,15 @@ def __init__(self, meta: JsonDict = None, **kwargs) -> None: self.meta = meta def as_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().as_dict() json_dict.update({ 'meta': self.meta }) return json_dict - # Update metadata about the snapshot metadata. def update(self, version: int, length: int, hashes: JsonDict) -> None: + """Assigns passed info about snapshot metadata to meta dict. """ self.meta['snapshot.json'] = { 'version': version, 'length': length, @@ -287,6 +357,31 @@ def update(self, version: int, length: int, hashes: JsonDict) -> None: class Snapshot(Signed): + """A container for the signed part of snapshot metadata. + + Attributes: + meta: A dictionary that contains information about targets metadata:: + + { + 'targets.json': { + 'version': , + 'length': , // optional + 'hashes': { + '': '', + '': '', + ... + } // optional + }, + '.json': { + ... + }, + '.json': { + ... + }, + ... + } + + """ def __init__(self, meta: JsonDict = None, **kwargs) -> None: # TODO: How much init magic do we want? # TODO: Is there merit in creating classes for dict fields? @@ -294,6 +389,7 @@ def __init__(self, meta: JsonDict = None, **kwargs) -> None: self.meta = meta def as_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().as_dict() json_dict.update({ 'meta': self.meta @@ -304,6 +400,7 @@ def as_dict(self) -> JsonDict: def update( self, rolename: str, version: int, length: Optional[int] = None, hashes: Optional[JsonDict] = None) -> None: + """Assigns passed (delegated) targets role info to meta dict. """ metadata_fn = f'{rolename}.json' self.meta[metadata_fn] = {'version': version} @@ -315,6 +412,59 @@ def update( class Targets(Signed): + """A container for the signed part of targets metadata. + + Attributes: + targets: A dictionary that contains information about target files:: + + { + '': { + 'length': , + 'hashes': { + '': '', + '': '', + ... + }, + 'custom': // optional + }, + ... + } + + delegations: A dictionary that contains a list of delegated target + roles and public key store used to verify their metadata + signatures:: + + { + 'keys' : { + '': { + 'keytype': '', + 'scheme': '', + 'keyid_hash_algorithms': [ + '', + '' + ... + ], + 'keyval': { + 'public': '' + } + }, + ... + }, + 'roles': [ + { + 'name': '', + 'keyids': ['', ...], + 'threshold': , + 'terminating': , + 'path_hash_prefixes': ['', ... ], // or + 'paths' : ['PATHPATTERN', ... ], + }, + ... + ] + } + + + """ def __init__( self, targets: JsonDict = None, delegations: JsonDict = None, **kwargs) -> None: @@ -325,6 +475,7 @@ def __init__( self.delegations = delegations def as_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().as_dict() json_dict.update({ 'targets': self.targets, @@ -334,4 +485,5 @@ def as_dict(self) -> JsonDict: # Add or update metadata about the target. def update(self, filename: str, fileinfo: JsonDict) -> None: + """Assigns passed target file info to meta dict. """ self.targets[filename] = fileinfo From 08bdc171e4008c39286d168c739d2a329f49d07e Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Mon, 31 Aug 2020 16:10:19 +0200 Subject: [PATCH 07/16] Add simple sign + verify Metadata methods (+tests) Add simple methods to create or verify signatures of the canonical_signed property of a Metadata object. See corresponding docstrings for behavior and design considerations. The commit also adds tests and updates the test setup to load some test keys into memory. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 54 +++++++++++++++++++++ tuf/api/metadata.py | 115 ++++++++++++++++++++++++-------------------- 2 files changed, 118 insertions(+), 51 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 65d4b4c09b..8243cbfc52 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -60,6 +60,17 @@ def setUpClass(cls): shutil.copytree( os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + # Load keys into memory + cls.keystore = {} + for role in ['delegation', 'snapshot', 'targets', 'timestamp']: + cls.keystore[role] = { + 'private': import_ed25519_privatekey_from_file( + os.path.join(cls.keystore_dir, role + '_key'), + password="password"), + 'public': import_ed25519_publickey_from_file( + os.path.join(cls.keystore_dir, role + '_key.pub')) + } + # TODO: Shut down Vault. @classmethod @@ -147,6 +158,49 @@ def test_read_write_read_compare(self): os.remove(path_2) + def test_sign_verify(self): + # Load sample metadata (targets) and assert ... + path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + metadata_obj = Metadata.read_from_json(path) + + # ... it has a single existing signature, + self.assertTrue(len(metadata_obj.signatures) == 1) + # ... valid for the correct key, but + self.assertTrue(metadata_obj.verify( + self.keystore['targets']['public'])) + # ... invalid for an unrelated key. + self.assertFalse(metadata_obj.verify( + self.keystore['snapshot']['public'])) + + # Append a new signature with the unrelated key and assert that ... + metadata_obj.sign(self.keystore['snapshot']['private'], append=True) + # ... there are now two signatures, and + self.assertTrue(len(metadata_obj.signatures) == 2) + # ... both are valid for the corresponding keys. + self.assertTrue(metadata_obj.verify( + self.keystore['targets']['public'])) + self.assertTrue(metadata_obj.verify( + self.keystore['snapshot']['public'])) + + # Create and assign (don't append) a new signature and assert that ... + metadata_obj.sign(self.keystore['timestamp']['private'], append=False) + # ... there now is only one signature, + self.assertTrue(len(metadata_obj.signatures) == 1) + # ... valid for that key. + self.assertTrue(metadata_obj.verify( + self.keystore['timestamp']['public'])) + + + # Update the metadata, invalidating the existing signature, append + # a new signature with the same key, and assert that ... + metadata_obj.signed.bump_version() + metadata_obj.sign(self.keystore['timestamp']['private'], append=True) + # ... verify returns False, because all signatures identified by a + # keyid must be valid + self.assertFalse(metadata_obj.verify( + self.keystore['timestamp']['public'])) + + def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features # with real data diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 064ba5395b..d239c8b6cb 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -5,11 +5,6 @@ updates, and create and verify signatures. TODO: - - * Finalize/Document Verify/Sign functions (I am not fully sure about expected - behavior). See - https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376 - * Validation (some thoughts ...) - Avoid schema, see secure-systems-lab/securesystemslib#183 - Provide methods to validate JSON representation (at user boundary) @@ -36,6 +31,7 @@ from securesystemslib.formats import encode_canonical from securesystemslib.util import load_json_file, persist_temp_file from securesystemslib.storage import StorageBackendInterface +from securesystemslib.keys import create_signature, verify_signature from tuf.repository_lib import ( _strip_version_number ) @@ -94,51 +90,69 @@ def as_json(self, compact: bool = False) -> None: separators=((',', ':') if compact else (',', ': ')), sort_keys=True) - # def __update_signature(self, signatures, keyid, signature): - # updated = False - # keyid_signature = {'keyid':keyid, 'sig':signature} - # for idx, keyid_sig in enumerate(signatures): - # if keyid_sig['keyid'] == keyid: - # signatures[idx] = keyid_signature - # updated = True - # if not updated: - # signatures.append(keyid_signature) - - # def sign(self, key_ring: ???) -> JsonDict: - # # FIXME: Needs documentation of expected behavior - # signed_bytes = self.signed_bytes - # signatures = self.__signatures - - # for key in key_ring.keys: - # signature = key.sign(self.signed_bytes) - # self.__update_signature(signatures, key.keyid, signature) - - # self.__signatures = signatures - # return self.signable - - # def verify(self, key_ring: ???) -> bool: - # # FIXME: Needs documentation of expected behavior - # signed_bytes = self.signed.signed_bytes - # signatures = self.signatures - # verified_keyids = set() - - # for signature in signatures: - # # TODO: handle an empty keyring - # for key in key_ring.keys: - # keyid = key.keyid - # if keyid == signature['keyid']: - # try: - # verified = key.verify(signed_bytes, signature) - # except: - # logging.exception(f'Could not verify signature for key {keyid}') - # continue - # else: - # # Avoid https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-6174 - # verified_keyids.add(keyid) - - # break - - # return len(verified_keyids) >= key_ring.threshold.least + def sign(self, key: JsonDict, append: bool = False) -> JsonDict: + """Creates signature over 'signed' and assigns it to 'signatures'. + + Arguments: + key: A securesystemslib-style private key object used for signing. + append: A boolean indicating if the signature should be appended to + the list of signatures or replace any existing signatures. The + default behavior is to replace signatures. + + Raises: + securesystemslib.exceptions.FormatError: Key argument is malformed. + securesystemslib.exceptions.CryptoError, \ + securesystemslib.exceptions.UnsupportedAlgorithmError: + Signing errors. + + Returns: + A securesystemslib-style signature object. + + """ + signature = create_signature(key, self.signed.signed_bytes) + + if append: + self.signatures.append(signature) + else: + self.signatures = [signature] + + return signature + + def verify(self, key: JsonDict) -> bool: + """Verifies 'signatures' over 'signed' that match the passed key by id. + + Arguments: + key: A securesystemslib-style public key object. + + Raises: + securesystemslib.exceptions.FormatError: Key argument is malformed. + securesystemslib.exceptions.CryptoError, \ + securesystemslib.exceptions.UnsupportedAlgorithmError: + Signing errors. + + Returns: + A boolean indicating if all identified signatures are valid. False + if no signature was found for the keyid or any of the found + signatures is invalid. + + FIXME: Is this behavior expected? An alternative approach would be + to raise an exception if no signature is found for the keyid, + and/or if more than one sigantures are found for the keyid. + + """ + signatures_for_keyid = list(filter( + lambda sig: sig['keyid'] == key['keyid'], self.signatures)) + + if not signatures_for_keyid: + return False + + for signature in signatures_for_keyid: + if not verify_signature(key, signature, self.signed.signed_bytes): + return False + + return True + + @classmethod def read_from_json( cls, filename: str, @@ -463,7 +477,6 @@ class Targets(Signed): ] } - """ def __init__( self, targets: JsonDict = None, delegations: JsonDict = None, From 21de660b66fde2befee5be3bf67eb0f24fb1a609 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Mon, 31 Aug 2020 16:13:59 +0200 Subject: [PATCH 08/16] Remove comments and unify quotes in api tests Signed-off-by: Lukas Puehringer --- tests/test_api.py | 73 +++++++++++------------------------------------ 1 file changed, 17 insertions(+), 56 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 8243cbfc52..964803eaa3 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -23,7 +23,7 @@ # Use setUpModule to tell unittest runner to skip this test module gracefully. def setUpModule(): if not IS_PY_VERSION_SUPPORTED: - raise unittest.SkipTest("requires Python 3.6 or higher") + raise unittest.SkipTest('requires Python 3.6 or higher') # Since setUpModule is called after imports we need to import conditionally. if IS_PY_VERSION_SUPPORTED: @@ -34,19 +34,22 @@ def setUpModule(): Targets ) + from securesystemslib.interface import ( + import_ed25519_publickey_from_file, + import_ed25519_privatekey_from_file + ) logger = logging.getLogger(__name__) class TestMetadata(unittest.TestCase): - # TODO: Start Vault in a dev mode, and export VAULT_ADDR as well as VAULT_TOKEN. - # TODO: Enable the Vault Transit secrets engine. + @classmethod def setUpClass(cls): - - # Create a temporary directory to store the repository, metadata, and target - # files. 'temporary_directory' must be deleted in TearDownClass() so that - # temporary files are always removed, even when exceptions occur. + # Create a temporary directory to store the repository, metadata, and + # target files. 'temporary_directory' must be deleted in + # TearDownClass() so that temporary files are always removed, even when + # exceptions occur. cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) test_repo_data = os.path.join( @@ -72,43 +75,18 @@ def setUpClass(cls): } - # TODO: Shut down Vault. @classmethod def tearDownClass(cls): - - # Remove the temporary repository directory, which should contain all the - # metadata, targets, and key files generated for the test cases. + # Remove the temporary repository directory, which should contain all + # the metadata, targets, and key files generated for the test cases. shutil.rmtree(cls.temporary_directory) - - # def _load_key_ring(self): - # key_list = [] - # root_key = RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), - # 'rsassa-pss-sha256', 'password') - # key_list.append(root_key) - - # for key_file in os.listdir(self.keystore_dir): - # if key_file.endswith('.pub'): - # # ignore public keys - # continue - - # if key_file.startswith('root_key'): - # # root key is loaded - # continue - - # key = RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file), - # 'ed25519', 'password') - # key_list.append(key) - - # threshold = Threshold(1, 5) - # return KeyRing(threshold=threshold, keys=key_list) - def test_generic_read(self): for metadata, inner_metadata_cls in [ - ("snapshot", Snapshot), - ("timestamp", Timestamp), - ("targets", Targets)]: + ('snapshot', Snapshot), + ('timestamp', Timestamp), + ('targets', Targets)]: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') metadata_obj = Metadata.read_from_json(path) @@ -134,6 +112,7 @@ def test_generic_read(self): os.remove(bad_metadata_path) + def test_compact_json(self): path = os.path.join(self.repo_dir, 'metadata', 'targets.json') metadata_obj = Metadata.read_from_json(path) @@ -143,7 +122,7 @@ def test_compact_json(self): def test_read_write_read_compare(self): - for metadata in ["snapshot", "timestamp", "targets"]: + for metadata in ['snapshot', 'timestamp', 'targets']: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') metadata_obj = Metadata.read_from_json(path) @@ -223,9 +202,6 @@ def test_metadata_snapshot(self): self.repo_dir, 'metadata', 'snapshot.json') snapshot = Snapshot.read_from_json(snapshot_path) - # key_ring = self._load_key_ring() - # snapshot.verify(key_ring) - # Create a dict representing what we expect the updated data to be fileinfo = snapshot.signed.meta hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} @@ -236,23 +212,12 @@ def test_metadata_snapshot(self): snapshot.signed.update('role1', 2, 123, hashes) self.assertEqual(snapshot.signed.meta, fileinfo) - # snapshot.signable() - - # snapshot.sign() - - # snapshot.verify() - - # snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json')) - def test_metadata_timestamp(self): timestamp_path = os.path.join( self.repo_dir, 'metadata', 'timestamp.json') timestamp = Timestamp.read_from_json(timestamp_path) - # key_ring = self._load_key_ring() - # timestamp.verify(key_ring) - self.assertEqual(timestamp.signed.version, 1) timestamp.signed.bump_version() self.assertEqual(timestamp.signed.version, 2) @@ -280,10 +245,6 @@ def test_metadata_timestamp(self): timestamp.signed.update(2, 520, hashes) self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) - # timestamp.sign() - - # timestamp.write_to_json() - # Run unit test. if __name__ == '__main__': From e61ae1bea372cc5fc96409c83bb23c9cc335ad0f Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 2 Sep 2020 17:23:37 +0200 Subject: [PATCH 09/16] Remove Signed.read_from_json metadata method Remove metadata factory on Signed class, for the sake of API simplicity/non-ambiguity, i.e. it's enough to have one way of loading any Metadata, that is: Metadata.read_from_json Signed-off-by: Lukas Puehringer --- tests/test_api.py | 15 +++++---------- tuf/api/metadata.py | 43 ------------------------------------------- 2 files changed, 5 insertions(+), 53 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 964803eaa3..85ba9721d3 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -91,15 +91,10 @@ def test_generic_read(self): path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') metadata_obj = Metadata.read_from_json(path) - # Assert that generic method ... - # ... instantiates the right inner class for each metadata type + # Assert that generic method instantiates the right inner class for + # each metadata type self.assertTrue( isinstance(metadata_obj.signed, inner_metadata_cls)) - # ... and reads the same metadata file as the corresponding method - # on the inner class would do (compare their dict representation) - self.assertDictEqual( - metadata_obj.as_dict(), - inner_metadata_cls.read_from_json(path).as_dict()) # Assert that it chokes correctly on an unknown metadata type bad_metadata_path = 'bad-metadata.json' @@ -185,7 +180,7 @@ def test_metadata_base(self): # with real data snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - md = Snapshot.read_from_json(snapshot_path) + md = Metadata.read_from_json(snapshot_path) self.assertEqual(md.signed.version, 1) md.signed.bump_version() @@ -200,7 +195,7 @@ def test_metadata_base(self): def test_metadata_snapshot(self): snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - snapshot = Snapshot.read_from_json(snapshot_path) + snapshot = Metadata.read_from_json(snapshot_path) # Create a dict representing what we expect the updated data to be fileinfo = snapshot.signed.meta @@ -216,7 +211,7 @@ def test_metadata_snapshot(self): def test_metadata_timestamp(self): timestamp_path = os.path.join( self.repo_dir, 'metadata', 'timestamp.json') - timestamp = Timestamp.read_from_json(timestamp_path) + timestamp = Metadata.read_from_json(timestamp_path) self.assertEqual(timestamp.signed.version, 1) timestamp.signed.bump_version() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index d239c8b6cb..7867feeafb 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -32,9 +32,6 @@ from securesystemslib.util import load_json_file, persist_temp_file from securesystemslib.storage import StorageBackendInterface from securesystemslib.keys import create_signature, verify_signature -from tuf.repository_lib import ( - _strip_version_number -) import iso8601 import tuf.formats @@ -288,46 +285,6 @@ def as_dict(self) -> JsonDict: 'expires': self.expires } - @classmethod - def read_from_json( - cls, filename: str, - storage_backend: Optional[StorageBackendInterface] = None - ) -> Metadata: - signable = load_json_file(filename, storage_backend) - """Loads corresponding JSON-formatted metadata from file storage. - - Arguments: - filename: The path to read the file from. - storage_backend: An object that implements - securesystemslib.storage.StorageBackendInterface. Per default - a (local) FilesystemBackend is used. - - Raises: - securesystemslib.exceptions.StorageError: The file cannot be read. - securesystemslib.exceptions.Error, ValueError: The metadata cannot - be parsed. - - Returns: - A TUF Metadata object whose signed attribute contains an object - of this class. - - """ - # FIXME: It feels dirty to access signable["signed"]["version"] here in - # order to do this check, and also a bit random (there are likely other - # things to check), but later we don't have the filename anymore. If we - # want to stick to the check, which seems reasonable, we should maybe - # think of a better place. - _, fn_prefix = _strip_version_number(filename, True) - if fn_prefix and fn_prefix != signable['signed']['version']: - raise ValueError( - f'version filename prefix ({fn_prefix}) must align with ' - f'version in metadata ({signable["signed"]["version"]}).') - - return Metadata( - signed=cls(**signable['signed']), - signatures=signable['signatures']) - - class Timestamp(Signed): """A container for the signed part of timestamp metadata. From f738ea0273dac17fb5f5c3f1399627af41fcf484 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 10 Sep 2020 16:11:23 +0200 Subject: [PATCH 10/16] Rename tuf metadata interface methods Consistenly rename de/serialization interface methods, using a 'from_' and 'to_' prefix. read_from_json -> from_json_file write_to_json -> to_json_file as_json -> to_json as_dict -> to_dict signed_bytes -> to_canonical_bytes The latter is also changed from a property to a method for consistency with the other serialization methods. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 28 ++++++++++++++-------------- tuf/api/metadata.py | 43 +++++++++++++++++++++---------------------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 85ba9721d3..13ed206c07 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -89,7 +89,7 @@ def test_generic_read(self): ('targets', Targets)]: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.read_from_json(path) + metadata_obj = Metadata.from_json_file(path) # Assert that generic method instantiates the right inner class for # each metadata type @@ -103,31 +103,31 @@ def test_generic_read(self): f.write(json.dumps(bad_metadata).encode('utf-8')) with self.assertRaises(ValueError): - Metadata.read_from_json(bad_metadata_path) + Metadata.from_json_file(bad_metadata_path) os.remove(bad_metadata_path) def test_compact_json(self): path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.read_from_json(path) + metadata_obj = Metadata.from_json_file(path) self.assertTrue( - len(metadata_obj.as_json(compact=True)) < - len(metadata_obj.as_json())) + len(metadata_obj.to_json(compact=True)) < + len(metadata_obj.to_json())) def test_read_write_read_compare(self): for metadata in ['snapshot', 'timestamp', 'targets']: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.read_from_json(path) + metadata_obj = Metadata.from_json_file(path) path_2 = path + '.tmp' - metadata_obj.write_to_json(path_2) - metadata_obj_2 = Metadata.read_from_json(path_2) + metadata_obj.to_json_file(path_2) + metadata_obj_2 = Metadata.from_json_file(path_2) self.assertDictEqual( - metadata_obj.as_dict(), - metadata_obj_2.as_dict()) + metadata_obj.to_dict(), + metadata_obj_2.to_dict()) os.remove(path_2) @@ -135,7 +135,7 @@ def test_read_write_read_compare(self): def test_sign_verify(self): # Load sample metadata (targets) and assert ... path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.read_from_json(path) + metadata_obj = Metadata.from_json_file(path) # ... it has a single existing signature, self.assertTrue(len(metadata_obj.signatures) == 1) @@ -180,7 +180,7 @@ def test_metadata_base(self): # with real data snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - md = Metadata.read_from_json(snapshot_path) + md = Metadata.from_json_file(snapshot_path) self.assertEqual(md.signed.version, 1) md.signed.bump_version() @@ -195,7 +195,7 @@ def test_metadata_base(self): def test_metadata_snapshot(self): snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - snapshot = Metadata.read_from_json(snapshot_path) + snapshot = Metadata.from_json_file(snapshot_path) # Create a dict representing what we expect the updated data to be fileinfo = snapshot.signed.meta @@ -211,7 +211,7 @@ def test_metadata_snapshot(self): def test_metadata_timestamp(self): timestamp_path = os.path.join( self.repo_dir, 'metadata', 'timestamp.json') - timestamp = Metadata.read_from_json(timestamp_path) + timestamp = Metadata.from_json_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) timestamp.signed.bump_version() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 7867feeafb..5686ac893c 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -72,17 +72,17 @@ def __init__( self.signed = signed self.signatures = signatures - def as_dict(self) -> JsonDict: + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ return { 'signatures': self.signatures, - 'signed': self.signed.as_dict() + 'signed': self.signed.to_dict() } - def as_json(self, compact: bool = False) -> None: + def to_json(self, compact: bool = False) -> None: """Returns the optionally compacted JSON representation of self. """ return json.dumps( - self.as_dict(), + self.to_dict(), indent=(None if compact else 1), separators=((',', ':') if compact else (',', ': ')), sort_keys=True) @@ -106,7 +106,7 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: A securesystemslib-style signature object. """ - signature = create_signature(key, self.signed.signed_bytes) + signature = create_signature(key, self.signed.to_canonical_bytes()) if append: self.signatures.append(signature) @@ -144,14 +144,14 @@ def verify(self, key: JsonDict) -> bool: return False for signature in signatures_for_keyid: - if not verify_signature(key, signature, self.signed.signed_bytes): + if not verify_signature( + key, signature, self.signed.to_canonical_bytes()): return False return True - @classmethod - def read_from_json( + def from_json_file( cls, filename: str, storage_backend: Optional[StorageBackendInterface] = None ) -> 'Metadata': @@ -194,7 +194,7 @@ def read_from_json( signed=inner_cls(**signable['signed']), signatures=signable['signatures']) - def write_to_json( + def to_json_file( self, filename: str, compact: bool = False, storage_backend: StorageBackendInterface = None) -> None: """Writes the JSON representation of self to file storage. @@ -212,10 +212,9 @@ def write_to_json( """ with tempfile.TemporaryFile() as f: - f.write(self.as_json(compact).encode('utf-8')) + f.write(self.to_json(compact).encode('utf-8')) persist_temp_file(f, filename, storage_backend) - class Signed: """A base class for the signed part of TUF metadata. @@ -229,7 +228,7 @@ class Signed: spec_version: The TUF specification version number (semver) the metadata format adheres to. expires: The metadata expiration datetime object. - signed_bytes: The UTF-8 encoded canonical JSON representation of self. + """ # NOTE: Signed is a stupid name, because this might not be signed yet, but @@ -260,13 +259,13 @@ def __init__( raise ValueError(f'version must be < 0, got {version}') self.version = version - @property - def signed_bytes(self) -> bytes: - return encode_canonical(self.as_dict()).encode('UTF-8') @property def expires(self) -> str: return self.__expiration.isoformat() + 'Z' + def to_canonical_bytes(self) -> bytes: + """Returns the UTF-8 encoded canonical JSON representation of self. """ + return encode_canonical(self.to_dict()).encode('UTF-8') def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: """Increments the expires attribute by the passed timedelta. """ @@ -276,7 +275,7 @@ def bump_version(self) -> None: """Increments the metadata version number by 1.""" self.version += 1 - def as_dict(self) -> JsonDict: + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ return { '_type': self._type, @@ -310,9 +309,9 @@ def __init__(self, meta: JsonDict = None, **kwargs) -> None: # TODO: Is there merit in creating classes for dict fields? self.meta = meta - def as_dict(self) -> JsonDict: + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().as_dict() + json_dict = super().to_dict() json_dict.update({ 'meta': self.meta }) @@ -359,9 +358,9 @@ def __init__(self, meta: JsonDict = None, **kwargs) -> None: super().__init__(**kwargs) self.meta = meta - def as_dict(self) -> JsonDict: + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().as_dict() + json_dict = super().to_dict() json_dict.update({ 'meta': self.meta }) @@ -444,9 +443,9 @@ def __init__( self.targets = targets self.delegations = delegations - def as_dict(self) -> JsonDict: + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().as_dict() + json_dict = super().to_dict() json_dict.update({ 'targets': self.targets, 'delegations': self.delegations, From f63dce6dddb9cfbf8986141340c6fac00a36d46e Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 3 Sep 2020 15:28:36 +0200 Subject: [PATCH 11/16] Refactor metadata constructors and add factory This commit better separates the Metadata class model from the Metadata wireline format, by tailoring the constructors towards class-based parameters and adding an additional factory classmethod that creates Metadata objects based on the wireline json/dictionary metadata representation. (pythonic way of constructor overloading). This 'from_dict' factory method recurses into the 'from_dict' methods of each contained complex field/attribute that is also represented by a class. Currently 'signed' is the only such attribute. This commit further: - Changes optional constructor keyword arguments to mandatory positional arguments: Reduces code and simplifies usage by restricting it. For now, users are unlikely to call constructor directly anyway, but the 'from_dict' factory (or its 'from_json_file' wrapper) instead. - Removes Signed.__expiration (datetime) vs. Signed.expires (datestring) dichotomy: Keeping only one representation of the same attribute in memory makes the interface simpler and less ambiguous. We choose the datetime object, because it is more convenient to modify. Transformation from and to the string format required by the tuf wireline format is performed in the corresponding metadata de/serialization methods, i.e. ('to_dict' and 'from_dict'). Signed-off-by: Lukas Puehringer --- tests/test_api.py | 18 +++---- tuf/api/metadata.py | 121 ++++++++++++++++++++++++++++---------------- 2 files changed, 85 insertions(+), 54 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 13ed206c07..4c6f157934 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,7 +14,7 @@ import tempfile import unittest -from datetime import timedelta +from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta # TODO: Remove case handling when fully dropping support for versions >= 3.6 @@ -185,11 +185,11 @@ def test_metadata_base(self): self.assertEqual(md.signed.version, 1) md.signed.bump_version() self.assertEqual(md.signed.version, 2) - self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z') + self.assertEqual(md.signed.expires, datetime(2030, 1, 1, 0, 0)) md.signed.bump_expiration() - self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z') + self.assertEqual(md.signed.expires, datetime(2030, 1, 2, 0, 0)) md.signed.bump_expiration(timedelta(days=365)) - self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z') + self.assertEqual(md.signed.expires, datetime(2031, 1, 2, 0, 0)) def test_metadata_snapshot(self): @@ -217,20 +217,20 @@ def test_metadata_timestamp(self): timestamp.signed.bump_version() self.assertEqual(timestamp.signed.version, 2) - self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z') + self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 1, 0, 0)) timestamp.signed.bump_expiration() - self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z') + self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 2, 0, 0)) timestamp.signed.bump_expiration(timedelta(days=365)) - self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z') + self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 2, 0, 0)) # Test whether dateutil.relativedelta works, this provides a much # easier to use interface for callers delta = relativedelta(days=1) timestamp.signed.bump_expiration(delta) - self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z') + self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 3, 0, 0)) delta = relativedelta(years=5) timestamp.signed.bump_expiration(delta) - self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z') + self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0)) hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} fileinfo = timestamp.signed.meta['snapshot.json'] diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 5686ac893c..8e8725ffc3 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -66,9 +66,7 @@ class Metadata(): ] """ - def __init__( - self, signed: 'Signed' = None, signatures: list = None) -> None: - # TODO: How much init magic do we want? + def __init__(self, signed: 'Signed', signatures: list) -> None: self.signed = signed self.signatures = signatures @@ -165,18 +163,37 @@ def from_json_file( Raises: securesystemslib.exceptions.StorageError: The file cannot be read. - securesystemslib.exceptions.Error, ValueError: The metadata cannot - be parsed. + securesystemslib.exceptions.Error, ValueError, KeyError: The + metadata cannot be parsed. Returns: A TUF Metadata object. """ - signable = load_json_file(filename, storage_backend) + return cls.from_dict(load_json_file(filename, storage_backend)) + + @classmethod + def from_dict(cls, metadata: JsonDict) -> 'Metadata': + """Creates Metadata object from its JSON/dict representation. + + Calls 'from_dict' for any complex metadata attribute represented by a + class also that has a 'from_dict' factory method. (Currently this is + only the signed attribute.) + + Arguments: + metadata: TUF metadata in JSON/dict representation, as e.g. + returned by 'json.loads'. + + Raises: + KeyError: The metadata dict format is invalid. + ValueError: The metadata has an unrecognized signed._type field. - # TODO: Should we use constants? - # And/or maybe a dispatch table? (<-- maybe too much magic) - _type = signable['signed']['_type'] + Returns: + A TUF Metadata object. + + """ + # Dispatch to contained metadata class on metadata _type field. + _type = metadata['signed']['_type'] if _type == 'targets': inner_cls = Targets @@ -190,9 +207,13 @@ def from_json_file( else: raise ValueError(f'unrecognized metadata type "{_type}"') - return Metadata( - signed=inner_cls(**signable['signed']), - signatures=signable['signatures']) + # NOTE: If Signature becomes a class, we should iterate over + # metadata['signatures'], call Signature.from_dict for each item, and + # pass a list of Signature objects to the Metadata constructor intead. + return cls( + signed=inner_cls.from_dict(metadata['signed']), + signatures=metadata['signatures']) + def to_json_file( self, filename: str, compact: bool = False, @@ -235,41 +256,48 @@ class Signed: # we keep it to match spec terminology (I often refer to this as "payload", # or "inner metadata") - # TODO: Re-think default values. It might be better to pass some things - # as args and not es kwargs. Then we'd need to pop those from - # signable["signed"] in read_from_json and pass them explicitly, which - # some say is better than implicit. :) def __init__( - self, _type: str = None, version: int = 0, - spec_version: str = None, expires: datetime = None - ) -> None: - # TODO: How much init magic do we want? + self, _type: str, version: int, spec_version: str, + expires: datetime) -> None: self._type = _type + self.version = version self.spec_version = spec_version + self.expires = expires - # We always intend times to be UTC - # NOTE: we could do this with datetime.fromisoformat() but that is not - # available in Python 2.7's datetime - # NOTE: Store as datetime object for convenient handling, use 'expires' - # property to get the TUF metadata format representation - self.__expiration = iso8601.parse_date(expires).replace(tzinfo=None) - + # TODO: Should we separate data validation from constructor? if version < 0: raise ValueError(f'version must be < 0, got {version}') self.version = version + @classmethod + def from_dict(cls, signed_dict: JsonDict) -> 'Signed': + """Creates Signed object from its JSON/dict representation. """ + + # Convert 'expires' TUF metadata string to a datetime object, which is + # what the constructor expects and what we store. The inverse operation + # is implemented in 'to_dict'. + signed_dict['expires'] = iso8601.parse_date( + signed_dict['expires']).replace(tzinfo=None) + # NOTE: We write the converted 'expires' back into 'signed_dict' above + # so that we can pass it to the constructor as '**signed_dict' below, + # along with other fields that belong to Signed subclasses. + # Any 'from_dict'(-like) conversions of fields that correspond to a + # subclass should be performed in the 'from_dict' method of that + # subclass and also be written back into 'signed_dict' before calling + # super().from_dict. + + # NOTE: cls might be a subclass of Signed, if 'from_dict' was called on + # that subclass (see e.g. Metadata.from_dict). + return cls(**signed_dict) - @property - def expires(self) -> str: - return self.__expiration.isoformat() + 'Z' def to_canonical_bytes(self) -> bytes: """Returns the UTF-8 encoded canonical JSON representation of self. """ return encode_canonical(self.to_dict()).encode('UTF-8') def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: """Increments the expires attribute by the passed timedelta. """ - self.__expiration = self.__expiration + delta + self.expires += delta def bump_version(self) -> None: """Increments the metadata version number by 1.""" @@ -281,7 +309,7 @@ def to_dict(self) -> JsonDict: '_type': self._type, 'version': self.version, 'spec_version': self.spec_version, - 'expires': self.expires + 'expires': self.expires.isoformat() + 'Z' } class Timestamp(Signed): @@ -303,10 +331,11 @@ class Timestamp(Signed): } """ - def __init__(self, meta: JsonDict = None, **kwargs) -> None: - super().__init__(**kwargs) - # TODO: How much init magic do we want? - # TODO: Is there merit in creating classes for dict fields? + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, meta: JsonDict) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta self.meta = meta def to_dict(self) -> JsonDict: @@ -352,10 +381,11 @@ class Snapshot(Signed): } """ - def __init__(self, meta: JsonDict = None, **kwargs) -> None: - # TODO: How much init magic do we want? - # TODO: Is there merit in creating classes for dict fields? - super().__init__(**kwargs) + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, meta: JsonDict) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta self.meta = meta def to_dict(self) -> JsonDict: @@ -435,14 +465,15 @@ class Targets(Signed): """ def __init__( - self, targets: JsonDict = None, delegations: JsonDict = None, - **kwargs) -> None: - # TODO: How much init magic do we want? - # TODO: Is there merit in creating classes for dict fields? - super().__init__(**kwargs) + self, _type: str, version: int, spec_version: str, + expires: datetime, targets: JsonDict, delegations: JsonDict + ) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta self.targets = targets self.delegations = delegations + def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().to_dict() From 387169fc11677216db93a1bbd1aa559387a73908 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 3 Sep 2020 15:35:05 +0200 Subject: [PATCH 12/16] Add from_json metadata convenience wrapper Add convenience wrapper that takes a json string and passes it to from_dict to create a Metadata object. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 16 ++++++++++++++-- tuf/api/metadata.py | 24 +++++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 4c6f157934..5db06cb06e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -88,13 +88,25 @@ def test_generic_read(self): ('timestamp', Timestamp), ('targets', Targets)]: + # Load JSON-formatted metdata of each supported type from file + # and from out-of-band read JSON string path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') metadata_obj = Metadata.from_json_file(path) + with open(path, 'rb') as f: + metadata_str = f.read() + metadata_obj2 = Metadata.from_json(metadata_str) - # Assert that generic method instantiates the right inner class for - # each metadata type + # Assert that both methods instantiate the right inner class for + # each metadata type and ... self.assertTrue( isinstance(metadata_obj.signed, inner_metadata_cls)) + self.assertTrue( + isinstance(metadata_obj2.signed, inner_metadata_cls)) + + # ... and return the same object (compared by dict representation) + self.assertDictEqual( + metadata_obj.to_dict(), metadata_obj2.to_dict()) + # Assert that it chokes correctly on an unknown metadata type bad_metadata_path = 'bad-metadata.json' diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 8e8725ffc3..cdbebf4272 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -29,7 +29,11 @@ import tempfile from securesystemslib.formats import encode_canonical -from securesystemslib.util import load_json_file, persist_temp_file +from securesystemslib.util import ( + load_json_file, + load_json_string, + persist_temp_file +) from securesystemslib.storage import StorageBackendInterface from securesystemslib.keys import create_signature, verify_signature @@ -77,6 +81,24 @@ def to_dict(self) -> JsonDict: 'signed': self.signed.to_dict() } + @classmethod + def from_json(cls, metadata_json: str) -> 'Metadata': + """Loads JSON-formatted TUF metadata from a string. + + Arguments: + metadata_json: TUF metadata in JSON-string representation. + + Raises: + securesystemslib.exceptions.Error, ValueError, KeyError: The + metadata cannot be parsed. + + Returns: + A TUF Metadata object. + + """ + return cls.from_dict(load_json_string(metadata_json)) + + def to_json(self, compact: bool = False) -> None: """Returns the optionally compacted JSON representation of self. """ return json.dumps( From f9a4ebe1eaa0c794512542c0cefb8efe57364302 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 3 Sep 2020 16:10:19 +0200 Subject: [PATCH 13/16] Re-order metadata methods logically and add vspace Signed-off-by: Lukas Puehringer --- tuf/api/metadata.py | 237 ++++++++++++++++++++++++-------------------- 1 file changed, 132 insertions(+), 105 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index cdbebf4272..715d5f2da0 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -18,9 +18,12 @@ * Add Root metadata class + * Add classes for other complex metadata attributes, see 'signatures' (in + Metadata) 'meta'/'targets' (in Timestamp, Snapshot, Targets), 'delegations' + (in Targets), 'keys'/'roles' (in not yet existent 'Delegation'), ... + """ # Imports - from datetime import datetime, timedelta from typing import Any, Dict, Optional @@ -42,12 +45,10 @@ # Types - JsonDict = Dict[str, Any] # Classes. - class Metadata(): """A container for signed TUF metadata. @@ -74,12 +75,50 @@ def __init__(self, signed: 'Signed', signatures: list) -> None: self.signed = signed self.signatures = signatures - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - return { - 'signatures': self.signatures, - 'signed': self.signed.to_dict() - } + + # Deserialization (factories). + @classmethod + def from_dict(cls, metadata: JsonDict) -> 'Metadata': + """Creates Metadata object from its JSON/dict representation. + + Calls 'from_dict' for any complex metadata attribute represented by a + class also that has a 'from_dict' factory method. (Currently this is + only the signed attribute.) + + Arguments: + metadata: TUF metadata in JSON/dict representation, as e.g. + returned by 'json.loads'. + + Raises: + KeyError: The metadata dict format is invalid. + ValueError: The metadata has an unrecognized signed._type field. + + Returns: + A TUF Metadata object. + + """ + # Dispatch to contained metadata class on metadata _type field. + _type = metadata['signed']['_type'] + + if _type == 'targets': + inner_cls = Targets + elif _type == 'snapshot': + inner_cls = Snapshot + elif _type == 'timestamp': + inner_cls = Timestamp + elif _type == 'root': + # TODO: implement Root class + raise NotImplementedError('Root not yet implemented') + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + # NOTE: If Signature becomes a class, we should iterate over + # metadata['signatures'], call Signature.from_dict for each item, and + # pass a list of Signature objects to the Metadata constructor intead. + return cls( + signed=inner_cls.from_dict(metadata['signed']), + signatures=metadata['signatures']) + @classmethod def from_json(cls, metadata_json: str) -> 'Metadata': @@ -99,6 +138,40 @@ def from_json(cls, metadata_json: str) -> 'Metadata': return cls.from_dict(load_json_string(metadata_json)) + @classmethod + def from_json_file( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads JSON-formatted TUF metadata from file storage. + + Arguments: + filename: The path to read the file from. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + + Raises: + securesystemslib.exceptions.StorageError: The file cannot be read. + securesystemslib.exceptions.Error, ValueError, KeyError: The + metadata cannot be parsed. + + Returns: + A TUF Metadata object. + + """ + return cls.from_dict(load_json_file(filename, storage_backend)) + + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + return { + 'signatures': self.signatures, + 'signed': self.signed.to_dict() + } + + def to_json(self, compact: bool = False) -> None: """Returns the optionally compacted JSON representation of self. """ return json.dumps( @@ -107,6 +180,30 @@ def to_json(self, compact: bool = False) -> None: separators=((',', ':') if compact else (',', ': ')), sort_keys=True) + + def to_json_file( + self, filename: str, compact: bool = False, + storage_backend: StorageBackendInterface = None) -> None: + """Writes the JSON representation of self to file storage. + + Arguments: + filename: The path to write the file to. + compact: A boolean indicating if the JSON string should be compact + by excluding whitespace. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + Raises: + securesystemslib.exceptions.StorageError: + The file cannot be written. + + """ + with tempfile.TemporaryFile() as f: + f.write(self.to_json(compact).encode('utf-8')) + persist_temp_file(f, filename, storage_backend) + + + # Signatures. def sign(self, key: JsonDict, append: bool = False) -> JsonDict: """Creates signature over 'signed' and assigns it to 'signatures'. @@ -135,6 +232,7 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: return signature + def verify(self, key: JsonDict) -> bool: """Verifies 'signatures' over 'signed' that match the passed key by id. @@ -170,93 +268,6 @@ def verify(self, key: JsonDict) -> bool: return True - @classmethod - def from_json_file( - cls, filename: str, - storage_backend: Optional[StorageBackendInterface] = None - ) -> 'Metadata': - """Loads JSON-formatted TUF metadata from file storage. - - Arguments: - filename: The path to read the file from. - storage_backend: An object that implements - securesystemslib.storage.StorageBackendInterface. Per default - a (local) FilesystemBackend is used. - - Raises: - securesystemslib.exceptions.StorageError: The file cannot be read. - securesystemslib.exceptions.Error, ValueError, KeyError: The - metadata cannot be parsed. - - Returns: - A TUF Metadata object. - - """ - return cls.from_dict(load_json_file(filename, storage_backend)) - - @classmethod - def from_dict(cls, metadata: JsonDict) -> 'Metadata': - """Creates Metadata object from its JSON/dict representation. - - Calls 'from_dict' for any complex metadata attribute represented by a - class also that has a 'from_dict' factory method. (Currently this is - only the signed attribute.) - - Arguments: - metadata: TUF metadata in JSON/dict representation, as e.g. - returned by 'json.loads'. - - Raises: - KeyError: The metadata dict format is invalid. - ValueError: The metadata has an unrecognized signed._type field. - - Returns: - A TUF Metadata object. - - """ - # Dispatch to contained metadata class on metadata _type field. - _type = metadata['signed']['_type'] - - if _type == 'targets': - inner_cls = Targets - elif _type == 'snapshot': - inner_cls = Snapshot - elif _type == 'timestamp': - inner_cls = Timestamp - elif _type == 'root': - # TODO: implement Root class - raise NotImplementedError('Root not yet implemented') - else: - raise ValueError(f'unrecognized metadata type "{_type}"') - - # NOTE: If Signature becomes a class, we should iterate over - # metadata['signatures'], call Signature.from_dict for each item, and - # pass a list of Signature objects to the Metadata constructor intead. - return cls( - signed=inner_cls.from_dict(metadata['signed']), - signatures=metadata['signatures']) - - - def to_json_file( - self, filename: str, compact: bool = False, - storage_backend: StorageBackendInterface = None) -> None: - """Writes the JSON representation of self to file storage. - - Arguments: - filename: The path to write the file to. - compact: A boolean indicating if the JSON string should be compact - by excluding whitespace. - storage_backend: An object that implements - securesystemslib.storage.StorageBackendInterface. Per default - a (local) FilesystemBackend is used. - Raises: - securesystemslib.exceptions.StorageError: - The file cannot be written. - - """ - with tempfile.TemporaryFile() as f: - f.write(self.to_json(compact).encode('utf-8')) - persist_temp_file(f, filename, storage_backend) class Signed: """A base class for the signed part of TUF metadata. @@ -292,6 +303,8 @@ def __init__( raise ValueError(f'version must be < 0, got {version}') self.version = version + + # Deserialization (factories). @classmethod def from_dict(cls, signed_dict: JsonDict) -> 'Signed': """Creates Signed object from its JSON/dict representation. """ @@ -313,17 +326,12 @@ def from_dict(cls, signed_dict: JsonDict) -> 'Signed': # that subclass (see e.g. Metadata.from_dict). return cls(**signed_dict) + + # Serialization. def to_canonical_bytes(self) -> bytes: """Returns the UTF-8 encoded canonical JSON representation of self. """ return encode_canonical(self.to_dict()).encode('UTF-8') - def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: - """Increments the expires attribute by the passed timedelta. """ - self.expires += delta - - def bump_version(self) -> None: - """Increments the metadata version number by 1.""" - self.version += 1 def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ @@ -334,6 +342,18 @@ def to_dict(self) -> JsonDict: 'expires': self.expires.isoformat() + 'Z' } + + # Modification. + def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + """Increments the expires attribute by the passed timedelta. """ + self.expires += delta + + + def bump_version(self) -> None: + """Increments the metadata version number by 1.""" + self.version += 1 + + class Timestamp(Signed): """A container for the signed part of timestamp metadata. @@ -360,6 +380,8 @@ def __init__( # TODO: Add class for meta self.meta = meta + + # Serialization. def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().to_dict() @@ -368,6 +390,8 @@ def to_dict(self) -> JsonDict: }) return json_dict + + # Modification. def update(self, version: int, length: int, hashes: JsonDict) -> None: """Assigns passed info about snapshot metadata to meta dict. """ self.meta['snapshot.json'] = { @@ -410,6 +434,7 @@ def __init__( # TODO: Add class for meta self.meta = meta + # Serialization. def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().to_dict() @@ -418,7 +443,8 @@ def to_dict(self) -> JsonDict: }) return json_dict - # Add or update metadata about the targets metadata. + + # Modification. def update( self, rolename: str, version: int, length: Optional[int] = None, hashes: Optional[JsonDict] = None) -> None: @@ -496,6 +522,7 @@ def __init__( self.delegations = delegations + # Serialization. def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ json_dict = super().to_dict() @@ -505,7 +532,7 @@ def to_dict(self) -> JsonDict: }) return json_dict - # Add or update metadata about the target. + # Modification. def update(self, filename: str, fileinfo: JsonDict) -> None: """Assigns passed target file info to meta dict. """ self.targets[filename] = fileinfo From 73dd72d54d291a88724a517b42ee51cc5d32b037 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 8 Sep 2020 17:28:50 +0200 Subject: [PATCH 14/16] Raise on bad signature count in Metadata.verify Change Metadata.verify(key) behavior to raise an exception if none or multiple signatures for the passed key are found on the Metadata object. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 27 +++++++++++++++------------ tuf/api/metadata.py | 27 ++++++++++++++------------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 5db06cb06e..b2329bf780 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -27,6 +27,7 @@ def setUpModule(): # Since setUpModule is called after imports we need to import conditionally. if IS_PY_VERSION_SUPPORTED: + import tuf.exceptions from tuf.api.metadata import ( Metadata, Snapshot, @@ -151,12 +152,9 @@ def test_sign_verify(self): # ... it has a single existing signature, self.assertTrue(len(metadata_obj.signatures) == 1) - # ... valid for the correct key, but + # ... which is valid for the correct key. self.assertTrue(metadata_obj.verify( self.keystore['targets']['public'])) - # ... invalid for an unrelated key. - self.assertFalse(metadata_obj.verify( - self.keystore['snapshot']['public'])) # Append a new signature with the unrelated key and assert that ... metadata_obj.sign(self.keystore['snapshot']['private'], append=True) @@ -176,15 +174,20 @@ def test_sign_verify(self): self.assertTrue(metadata_obj.verify( self.keystore['timestamp']['public'])) - - # Update the metadata, invalidating the existing signature, append - # a new signature with the same key, and assert that ... - metadata_obj.signed.bump_version() + # Assert exception if there are more than one signatures for a key metadata_obj.sign(self.keystore['timestamp']['private'], append=True) - # ... verify returns False, because all signatures identified by a - # keyid must be valid - self.assertFalse(metadata_obj.verify( - self.keystore['timestamp']['public'])) + with self.assertRaises(tuf.exceptions.Error) as ctx: + metadata_obj.verify(self.keystore['timestamp']['public']) + self.assertTrue( + '2 signatures for key' in str(ctx.exception), + str(ctx.exception)) + + # Assert exception if there is no signature for a key + with self.assertRaises(tuf.exceptions.Error) as ctx: + metadata_obj.verify(self.keystore['targets']['public']) + self.assertTrue( + 'no signature for' in str(ctx.exception), + str(ctx.exception)) def test_metadata_base(self): diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 715d5f2da0..90ee324839 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -42,6 +42,7 @@ import iso8601 import tuf.formats +import tuf.exceptions # Types @@ -240,33 +241,33 @@ def verify(self, key: JsonDict) -> bool: key: A securesystemslib-style public key object. Raises: + # TODO: Revise exception taxonomy + tuf.exceptions.Error: None or multiple signatures found for key. securesystemslib.exceptions.FormatError: Key argument is malformed. securesystemslib.exceptions.CryptoError, \ securesystemslib.exceptions.UnsupportedAlgorithmError: Signing errors. Returns: - A boolean indicating if all identified signatures are valid. False - if no signature was found for the keyid or any of the found - signatures is invalid. - - FIXME: Is this behavior expected? An alternative approach would be - to raise an exception if no signature is found for the keyid, - and/or if more than one sigantures are found for the keyid. + A boolean indicating if the signature is valid for the passed key. """ signatures_for_keyid = list(filter( lambda sig: sig['keyid'] == key['keyid'], self.signatures)) if not signatures_for_keyid: - return False + raise tuf.exceptions.Error( + f'no signature for key {key["keyid"]}.') - for signature in signatures_for_keyid: - if not verify_signature( - key, signature, self.signed.to_canonical_bytes()): - return False + elif len(signatures_for_keyid) > 1: + raise tuf.exceptions.Error( + f'{len(signatures_for_keyid)} signatures for key ' + f'{key["keyid"]}, not sure which one to verify.') + else: + return verify_signature( + key, signatures_for_keyid[0], + self.signed.to_canonical_bytes()) - return True class Signed: From 228a4c72e01c65f0afa448aa30486bda2099577f Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 10 Sep 2020 15:49:06 +0200 Subject: [PATCH 15/16] Ticketize doc header todo items See: Add root metadata class to new TUF metadata model #1137 Add classes for complex metadata fields #1139 Add input validation to simple metadata api #1140 Signed-off-by: Lukas Puehringer --- tuf/api/metadata.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 90ee324839..07042bced8 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -4,24 +4,6 @@ to read/serialize/write from and to JSON, perform TUF-compliant metadata updates, and create and verify signatures. -TODO: - * Validation (some thoughts ...) - - Avoid schema, see secure-systems-lab/securesystemslib#183 - - Provide methods to validate JSON representation (at user boundary) - - Fail on bad json metadata in read_from_json method - - Be lenient on bad/invalid metadata objects in memory, they might be - work in progress. E.g. it might be convenient to create empty metadata - and assign attributes later on. - - Fail on bad json metadata in write_to_json method, but with option to - disable check as there might be a justified reason to write WIP - metadata to json. - - * Add Root metadata class - - * Add classes for other complex metadata attributes, see 'signatures' (in - Metadata) 'meta'/'targets' (in Timestamp, Snapshot, Targets), 'delegations' - (in Targets), 'keys'/'roles' (in not yet existent 'Delegation'), ... - """ # Imports from datetime import datetime, timedelta From f106435aa51fe18eaa5e5e6122a6bd4cc9f58cd4 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 10 Sep 2020 15:52:59 +0200 Subject: [PATCH 16/16] Remove iso8601 dependency from simple metadata api Use builtin datetime instead of external iso6801 for simple datetime string parsing. Also see https://github.com/theupdateframework/tuf/issues/1065 Signed-off-by: Lukas Puehringer --- tuf/api/metadata.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 07042bced8..ee5ab8eed1 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -22,7 +22,6 @@ from securesystemslib.storage import StorageBackendInterface from securesystemslib.keys import create_signature, verify_signature -import iso8601 import tuf.formats import tuf.exceptions @@ -295,8 +294,9 @@ def from_dict(cls, signed_dict: JsonDict) -> 'Signed': # Convert 'expires' TUF metadata string to a datetime object, which is # what the constructor expects and what we store. The inverse operation # is implemented in 'to_dict'. - signed_dict['expires'] = iso8601.parse_date( - signed_dict['expires']).replace(tzinfo=None) + signed_dict['expires'] = datetime.strptime( + signed_dict['expires'], + "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=None) # NOTE: We write the converted 'expires' back into 'signed_dict' above # so that we can pass it to the constructor as '**signed_dict' below, # along with other fields that belong to Signed subclasses.