From f0bd094782d78bf28bcb2476d5b17fd3eb9d3528 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Aug 2020 11:15:49 +0200 Subject: [PATCH] Add simple TUF role metadata model (WIP) Add metadata module with container classes for TUF role metadata, including methods to read/serialize/write from and to JSON, perform TUF-compliant metadata updates, and create and verify signatures. The 'Metadata' class provides a container for inner TUF metadata objects (Root, Timestamp, Snapshot, Targets) (i.e. OOP composition) The 'Signed' class provides a base class to aggregate common attributes (i.e. version, expires, spec_version) of the inner metadata classes. (i.e. OOP inheritance). The name of the class also aligns with the 'signed' field of the outer metadata container. Based on prior observations in TUF's sister project in-toto, this architecture seems to well represent the metadata model as it is defined in the specification (see in-toto/in-toto#98 and in-toto/in-toto#142 for related discussions). This commits also adds tests. **TODO: See doc header TODO list** **Additional design considerations** (also in regards to prior sketches of this module) - Aims at simplicity, brevity and recognizability of the wireline metadata format. - All attributes that correspond to fields in TUF JSON metadata are public. There doesn't seem to be a good reason to protect them with leading underscores and use setters/getters instead, it just adds more code, and impedes recognizability of the wireline metadata format. - Although, it might be convenient to have short-cuts on the Metadata class that point to methods and attributes that are common to all subclasses of the contained Signed class (e.g. Metadata.version instead of Metadata.signed.version, etc.), this also conflicts with goal of recognizability of the wireline metadata. Thus we won't add such short-cuts for now. See: https://github.com/theupdateframework/tuf/pull/1060#discussion_r452906629 - Signing keys and a 'consistent_snapshot' boolean are not on the targets metadata class. They are a better fit for management code. See: https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376, and #660. - Does not use sslib schema checks (see TODO notes about validation in doc header) - Does not use existing tuf utils, such as make_metadata_fileinfo, build_dict_conforming_to_schema, if it is easy and more explicit to just re-implement the desired behavior on the metadata classes. - All datetime's are treated as UTC. Since timezone info is not captured in the wireline metadata format it should not be captured in the internal representation either. - Does not use 3rd-party dateutil package, in order to minimize dependency footprint, which is especially important for update clients which often have to vendor their dependencies. However, compatibility between the more advanced dateutil.relativedelta (e.g handles leap years automatically) and timedelta is tested. - Uses PEP8 indentation (4 space) and Google-style doc string instead of sslab-style. See https://github.com/secure-systems-lab/code-style-guidelines/issues/20 - Does not support Python =< 3.5 Co-authored-by: Trishank Karthik Kuppusamy Co-authored-by: Joshua Lock Co-authored-by: Teodora Sechkova Signed-off-by: Lukas Puehringer --- tests/test_api.py | 165 ++++++++++++++++++++++++++ tuf/api/metadata.py | 277 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 tests/test_api.py create mode 100644 tuf/api/metadata.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000000..42db352404 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python + +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 +""" Unit tests for api/metdata.py +""" +import logging +import os +import shutil +import tempfile +import unittest + +from datetime import timedelta +from dateutil.relativedelta import relativedelta + +from tuf.api.metadata import ( + Snapshot, + Timestamp, +) + +logger = logging.getLogger(__name__) + + +class TestMetadata(unittest.TestCase): + # TODO: Start Vault in a dev mode, and export VAULT_ADDR as well as VAULT_TOKEN. + # TODO: Enable the Vault Transit secrets engine. + @classmethod + def setUpClass(cls): + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + test_repo_data = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'repository_data') + + cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + shutil.copytree( + os.path.join(test_repo_data, 'repository'), cls.repo_dir) + + cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + shutil.copytree( + os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + + + # TODO: Shut down Vault. + @classmethod + def tearDownClass(cls): + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + # def _load_key_ring(self): + # key_list = [] + # root_key = RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), + # 'rsassa-pss-sha256', 'password') + # key_list.append(root_key) + + # for key_file in os.listdir(self.keystore_dir): + # if key_file.endswith('.pub'): + # # ignore public keys + # continue + + # if key_file.startswith('root_key'): + # # root key is loaded + # continue + + # key = RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file), + # 'ed25519', 'password') + # key_list.append(key) + + # threshold = Threshold(1, 5) + # return KeyRing(threshold=threshold, keys=key_list) + + def test_metadata_base(self): + # Use of Snapshot is arbitrary, we're just testing the base class features + # with real data + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + md = Snapshot.read_from_json(snapshot_path) + + self.assertEqual(md.signed.version, 1) + md.signed.bump_version() + self.assertEqual(md.signed.version, 2) + self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z') + md.signed.bump_expiration() + self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z') + md.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z') + + + def test_metadata_snapshot(self): + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Snapshot.read_from_json(snapshot_path) + + # key_ring = self._load_key_ring() + # snapshot.verify(key_ring) + + # Create a dict representing what we expect the updated data to be + fileinfo = snapshot.signed.meta + hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + fileinfo['role1.json']['version'] = 2 + fileinfo['role1.json']['hashes'] = hashes + fileinfo['role1.json']['length'] = 123 + + snapshot.signed.update('role1', 2, 123, hashes) + self.assertEqual(snapshot.signed.meta, fileinfo) + + # snapshot.signable() + + # snapshot.sign() + + # snapshot.verify() + + # snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json')) + + + def test_metadata_timestamp(self): + timestamp_path = os.path.join( + self.repo_dir, 'metadata', 'timestamp.json') + timestamp = Timestamp.read_from_json(timestamp_path) + + # key_ring = self._load_key_ring() + # timestamp.verify(key_ring) + + self.assertEqual(timestamp.signed.version, 1) + timestamp.signed.bump_version() + self.assertEqual(timestamp.signed.version, 2) + + self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z') + timestamp.signed.bump_expiration() + self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z') + timestamp.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z') + + # Test whether dateutil.relativedelta works, this provides a much + # easier to use interface for callers + delta = relativedelta(days=1) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z') + delta = relativedelta(years=5) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z') + + hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + fileinfo = timestamp.signed.meta['snapshot.json'] + fileinfo['hashes'] = hashes + fileinfo['version'] = 2 + fileinfo['length'] = 520 + timestamp.signed.update(2, 520, hashes) + self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) + + # timestamp.sign() + + # timestamp.write_to_json() + + +# Run unit test. +if __name__ == '__main__': + unittest.main() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py new file mode 100644 index 0000000000..dddb47af06 --- /dev/null +++ b/tuf/api/metadata.py @@ -0,0 +1,277 @@ +"""TUF role metadata model. + +This module provides container classes for TUF role metadata, including methods +to read/serialize/write from and to JSON, perform TUF-compliant metadata +updates, and create and verify signatures. + +TODO: + + * Add docstrings + + * Finalize/Document Verify/Sign functions (I am not fully sure about expected + behavior). See + https://github.com/theupdateframework/tuf/pull/1060#issuecomment-660056376 + + * Validation (some thoughts ...) + - Avoid schema, see secure-systems-lab/securesystemslib#183 + - Provide methods to validate JSON representation (at user boundary) + - Fail on bad json metadata in read_from_json method + - Be lenient on bad/invalid metadata objects in memory, they might be + work in progress. E.g. it might be convenient to create empty metadata + and assign attributes later on. + - Fail on bad json metadata in write_to_json method, but with option to + disable check as there might be a justified reason to write WIP + metadata to json. + + * It might be nice to have a generic Metadata.read_from_json that + can load any TUF role metadata and instantiate the appropriate object based + on the json '_type' field. + + * Add Root metadata class + +""" +# Imports + +from datetime import datetime, timedelta +from typing import Any, Dict, Optional + +import json +import logging +import tempfile + +from securesystemslib.formats import encode_canonical +from securesystemslib.util import load_json_file, persist_temp_file +from securesystemslib.storage import StorageBackendInterface +from tuf.repository_lib import ( + _get_written_metadata, + _strip_version_number +) + +import iso8601 +import tuf.formats + + +# Types + +JsonDict = Dict[str, Any] + + +# Classes. + +class Metadata(): + def __init__( + self, signed: 'Signed' = None, signatures: list = None) -> None: + # TODO: How much init magic do we want? + self.signed = signed + self.signatures = signatures + + def as_dict(self) -> JsonDict: + return { + 'signatures': self.signatures, + 'signed': self.signed.as_dict() + } + + # def __update_signature(self, signatures, keyid, signature): + # updated = False + # keyid_signature = {'keyid':keyid, 'sig':signature} + # for idx, keyid_sig in enumerate(signatures): + # if keyid_sig['keyid'] == keyid: + # signatures[idx] = keyid_signature + # updated = True + # if not updated: + # signatures.append(keyid_signature) + + # def sign(self, key_ring: ???) -> JsonDict: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed_bytes + # signatures = self.__signatures + + # for key in key_ring.keys: + # signature = key.sign(self.signed_bytes) + # self.__update_signature(signatures, key.keyid, signature) + + # self.__signatures = signatures + # return self.signable + + # def verify(self, key_ring: ???) -> bool: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed.signed_bytes + # signatures = self.signatures + # verified_keyids = set() + + # for signature in signatures: + # # TODO: handle an empty keyring + # for key in key_ring.keys: + # keyid = key.keyid + # if keyid == signature['keyid']: + # try: + # verified = key.verify(signed_bytes, signature) + # except: + # logging.exception(f'Could not verify signature for key {keyid}') + # continue + # else: + # # Avoid https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-6174 + # verified_keyids.add(keyid) + + # break + + # return len(verified_keyids) >= key_ring.threshold.least + + def write_to_json( + self, filename: str, + storage_backend: StorageBackendInterface = None) -> None: + with tempfile.TemporaryFile() as f: + f.write(_get_written_metadata(self.sign()).encode_canonical()) + persist_temp_file(f, filename, storage_backend) + + +class Signed: + # NOTE: Signed is a stupid name, because this might not be signed yet, but + # we keep it to match spec terminology (I often refer to this as "payload", + # or "inner metadata") + + # TODO: Re-think default values. It might be better to pass some things + # as args and not es kwargs. Then we'd need to pop those from + # signable["signed"] in read_from_json and pass them explicitly, which + # some say is better than implicit. :) + def __init__( + self, _type: str = None, version: int = 0, + spec_version: str = None, expires: datetime = None + ) -> None: + # TODO: How much init magic do we want? + + self._type = _type + self.spec_version = spec_version + + # We always intend times to be UTC + # NOTE: we could do this with datetime.fromisoformat() but that is not + # available in Python 2.7's datetime + # NOTE: Store as datetime object for convenient handling, use 'expires' + # property to get the TUF metadata format representation + self.__expiration = iso8601.parse_date(expires).replace(tzinfo=None) + + if version < 0: + raise ValueError(f'version must be < 0, got {version}') + self.version = version + + @property + def signed_bytes(self) -> bytes: + return encode_canonical(self.as_dict()).encode('UTF-8') + + @property + def expires(self) -> str: + """The expiration property in TUF metadata format.""" + return self.__expiration.isoformat() + 'Z' + + def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + self.__expiration = self.__expiration + delta + + def bump_version(self) -> None: + self.version += 1 + + def as_dict(self) -> JsonDict: + # NOTE: The classes should be the single source of truth about metadata + # let's define the dict representation here and not in some dubious + # build_dict_conforming_to_schema + return { + '_type': self._type, + 'version': self.version, + 'spec_version': self.spec_version, + 'expires': self.expires + } + + @classmethod + def read_from_json( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> Metadata: + signable = load_json_file(filename, storage_backend) + + # FIXME: It feels dirty to access signable["signed"]["version"] here in + # order to do this check, and also a bit random (there are likely other + # things to check), but later we don't have the filename anymore. If we + # want to stick to the check, which seems reasonable, we should maybe + # think of a better place. + _, fn_prefix = _strip_version_number(filename, True) + if fn_prefix and fn_prefix != signable['signed']['version']: + raise ValueError( + f'version filename prefix ({fn_prefix}) must align with ' + f'version in metadata ({signable["signed"]["version"]}).') + + return Metadata( + signed=cls(**signable['signed']), + signatures=signable['signatures']) + + +class Timestamp(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + super().__init__(**kwargs) + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Update metadata about the snapshot metadata. + def update(self, version: int, length: int, hashes: JsonDict) -> None: + fileinfo = self.meta.get('snapshot.json', {}) + fileinfo['version'] = version + fileinfo['length'] = length + fileinfo['hashes'] = hashes + self.meta['snapshot.json'] = fileinfo + + +class Snapshot(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Add or update metadata about the targets metadata. + def update( + self, rolename: str, version: int, length: Optional[int] = None, + hashes: Optional[JsonDict] = None) -> None: + metadata_fn = f'{rolename}.json' + + self.meta[metadata_fn] = {'version': version} + if length is not None: + self.meta[metadata_fn]['length'] = length + + if hashes is not None: + self.meta[metadata_fn]['hashes'] = hashes + + +class Targets(Signed): + def __init__( + self, targets: JsonDict = None, delegations: JsonDict = None, + **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.targets = targets + self.delegations = delegations + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'targets': self.targets, + 'delegations': self.delegations, + }) + return json_dict + + # Add or update metadata about the target. + def update(self, filename: str, fileinfo: JsonDict) -> None: + self.targets[filename] = fileinfo