diff --git a/tests/test_api.py b/tests/test_api.py index ec7d182b79..e13c704242 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -28,6 +28,16 @@ Targets ) +from tuf.api.serialization import ( + DeserializationError +) + +from tuf.api.serialization.json import ( + JSONSerializer, + JSONDeserializer, + CanonicalJSONSerializer +) + from securesystemslib.interface import ( import_ed25519_publickey_from_file, import_ed25519_privatekey_from_file @@ -89,10 +99,10 @@ def test_generic_read(self): # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) with open(path, 'rb') as f: metadata_str = f.read() - metadata_obj2 = Metadata.from_json(metadata_str) + metadata_obj2 = JSONDeserializer().deserialize(metadata_str) # Assert that both methods instantiate the right inner class for # each metadata type and ... @@ -112,28 +122,28 @@ def test_generic_read(self): with open(bad_metadata_path, 'wb') as f: f.write(json.dumps(bad_metadata).encode('utf-8')) - with self.assertRaises(ValueError): - Metadata.from_json_file(bad_metadata_path) + with self.assertRaises(DeserializationError): + Metadata.from_file(bad_metadata_path) os.remove(bad_metadata_path) def test_compact_json(self): path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) self.assertTrue( - len(metadata_obj.to_json(compact=True)) < - len(metadata_obj.to_json())) + len(JSONSerializer(compact=True).serialize(metadata_obj)) < + len(JSONSerializer().serialize(metadata_obj))) def test_read_write_read_compare(self): for metadata in ['snapshot', 'timestamp', 'targets']: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) path_2 = path + '.tmp' - metadata_obj.to_json_file(path_2) - metadata_obj_2 = Metadata.from_json_file(path_2) + metadata_obj.to_file(path_2) + metadata_obj_2 = Metadata.from_file(path_2) self.assertDictEqual( metadata_obj.to_dict(), @@ -145,7 +155,7 @@ def test_read_write_read_compare(self): def test_sign_verify(self): # Load sample metadata (targets) and assert ... path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) # ... it has a single existing signature, self.assertTrue(len(metadata_obj.signatures) == 1) @@ -192,7 +202,7 @@ def test_metadata_base(self): # with real data snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - md = Metadata.from_json_file(snapshot_path) + md = Metadata.from_file(snapshot_path) self.assertEqual(md.signed.version, 1) md.signed.bump_version() @@ -207,7 +217,7 @@ def test_metadata_base(self): def test_metadata_snapshot(self): snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - snapshot = Metadata.from_json_file(snapshot_path) + snapshot = Metadata.from_file(snapshot_path) # Create a dict representing what we expect the updated data to be fileinfo = copy.deepcopy(snapshot.signed.meta) @@ -225,7 +235,7 @@ def test_metadata_snapshot(self): def test_metadata_timestamp(self): timestamp_path = os.path.join( self.repo_dir, 'metadata', 'timestamp.json') - timestamp = Metadata.from_json_file(timestamp_path) + timestamp = Metadata.from_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) timestamp.signed.bump_version() @@ -260,7 +270,7 @@ def test_metadata_timestamp(self): def test_metadata_root(self): root_path = os.path.join( self.repo_dir, 'metadata', 'root.json') - root = Metadata.from_json_file(root_path) + root = Metadata.from_file(root_path) # Add a second key to root role root_key2 = import_ed25519_publickey_from_file( @@ -293,7 +303,7 @@ def test_metadata_root(self): def test_metadata_targets(self): targets_path = os.path.join( self.repo_dir, 'metadata', 'targets.json') - targets = Metadata.from_json_file(targets_path) + targets = Metadata.from_file(targets_path) # Create a fileinfo dict representing what we expect the updated data to be filename = 'file2.txt' diff --git a/tox.ini b/tox.ini index 9ab6dee135..b54a221cc0 100644 --- a/tox.ini +++ b/tox.ini @@ -42,6 +42,9 @@ commands = [testenv:lint] commands = - pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api + # Use different pylint configs for legacy and new (tuf/api) code + # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does + # work, unfortunately each subdirectory has to be ignored explicitly. + pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization pylint {toxinidir}/tuf/api --rcfile={toxinidir}/tuf/api/pylintrc bandit -r {toxinidir}/tuf diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index a747be6d13..1209577457 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -1,47 +1,49 @@ +# Copyright New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + """TUF role metadata model. This module provides container classes for TUF role metadata, including methods -to read/serialize/write from and to JSON, perform TUF-compliant metadata -updates, and create and verify signatures. +to read and write from and to file, perform TUF-compliant metadata updates, and +create and verify signatures. + +The metadata model supports any custom serialization format, defaulting to JSON +as wireline format and Canonical JSON for reproducible signature creation and +verification. +Custom serializers must implement the abstract serialization interface defined +in 'tuf.api.serialization', and may use the [to|from]_dict convenience methods +available in the class model. """ -# Imports from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import Any, Dict, Mapping, Optional -import json import tempfile -from securesystemslib.formats import encode_canonical -from securesystemslib.util import ( - load_json_file, - load_json_string, - persist_temp_file -) -from securesystemslib.storage import StorageBackendInterface +from securesystemslib.util import persist_temp_file +from securesystemslib.storage import (StorageBackendInterface, + FilesystemBackend) from securesystemslib.keys import create_signature, verify_signature +from tuf.api.serialization import (MetadataSerializer, MetadataDeserializer, + SignedSerializer) + import tuf.formats import tuf.exceptions -# Types -JsonDict = Dict[str, Any] - - -# Classes. class Metadata(): """A container for signed TUF metadata. - Provides methods to (de-)serialize JSON metadata from and to file - storage, and to create and verify signatures. + Provides methods to convert to and from dictionary, read and write to and + from file and to create and verify metadata signatures. Attributes: signed: A subclass of Signed, which has the actual metadata payload, i.e. one of Targets, Snapshot, Timestamp or Root. - signatures: A list of signatures over the canonical JSON representation - of the value of the signed attribute:: + signatures: A list of signatures over the canonical representation of + the value of the signed attribute:: [ { @@ -56,24 +58,20 @@ def __init__(self, signed: 'Signed', signatures: list) -> None: self.signed = signed self.signatures = signatures - - # Deserialization (factories). @classmethod - def from_dict(cls, metadata: JsonDict) -> 'Metadata': - """Creates Metadata object from its JSON/dict representation. - - Calls 'from_dict' for any complex metadata attribute represented by a - class also that has a 'from_dict' factory method. (Currently this is - only the signed attribute.) + def from_dict(cls, metadata: Mapping[str, Any]) -> 'Metadata': + """Creates Metadata object from its dict representation. Arguments: - metadata: TUF metadata in JSON/dict representation, as e.g. - returned by 'json.loads'. + metadata: TUF metadata in dict representation. Raises: KeyError: The metadata dict format is invalid. ValueError: The metadata has an unrecognized signed._type field. + Side Effect: + Destroys the metadata Mapping passed by reference. + Returns: A TUF Metadata object. @@ -94,97 +92,95 @@ class also that has a 'from_dict' factory method. (Currently this is # NOTE: If Signature becomes a class, we should iterate over # metadata['signatures'], call Signature.from_dict for each item, and - # pass a list of Signature objects to the Metadata constructor intead. + # pass a list of Signature objects to the Metadata constructor instead. return cls( - signed=inner_cls.from_dict(metadata['signed']), - signatures=metadata['signatures']) - + signed=inner_cls.from_dict(metadata.pop('signed')), + signatures=metadata.pop('signatures')) @classmethod - def from_json(cls, metadata_json: str) -> 'Metadata': - """Loads JSON-formatted TUF metadata from a string. - - Arguments: - metadata_json: TUF metadata in JSON-string representation. - - Raises: - securesystemslib.exceptions.Error, ValueError, KeyError: The - metadata cannot be parsed. - - Returns: - A TUF Metadata object. - - """ - return cls.from_dict(load_json_string(metadata_json)) - - - @classmethod - def from_json_file( - cls, filename: str, - storage_backend: Optional[StorageBackendInterface] = None - ) -> 'Metadata': - """Loads JSON-formatted TUF metadata from file storage. + def from_file( + cls, filename: str, + deserializer: Optional[MetadataDeserializer] = None, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads TUF metadata from file storage. Arguments: filename: The path to read the file from. + deserializer: A MetadataDeserializer subclass instance that + implements the desired wireline format deserialization. Per + default a JSONDeserializer is used. storage_backend: An object that implements securesystemslib.storage.StorageBackendInterface. Per default a (local) FilesystemBackend is used. Raises: securesystemslib.exceptions.StorageError: The file cannot be read. - securesystemslib.exceptions.Error, ValueError, KeyError: The - metadata cannot be parsed. + tuf.api.serialization.DeserializationError: + The file cannot be deserialized. Returns: A TUF Metadata object. """ - return cls.from_dict(load_json_file(filename, storage_backend)) + if deserializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONDeserializer + deserializer = JSONDeserializer() + + if storage_backend is None: + storage_backend = FilesystemBackend() + with storage_backend.get(filename) as file_obj: + raw_data = file_obj.read() - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ + return deserializer.deserialize(raw_data) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self. """ return { 'signatures': self.signatures, 'signed': self.signed.to_dict() } - - def to_json(self, compact: bool = False) -> None: - """Returns the optionally compacted JSON representation of self. """ - return json.dumps( - self.to_dict(), - indent=(None if compact else 1), - separators=((',', ':') if compact else (',', ': ')), - sort_keys=True) - - - def to_json_file( - self, filename: str, compact: bool = False, - storage_backend: StorageBackendInterface = None) -> None: - """Writes the JSON representation of self to file storage. + def to_file( + self, filename: str, serializer: Optional[MetadataSerializer] = None, + storage_backend: Optional[StorageBackendInterface] = None + ) -> None: + """Writes TUF metadata to file storage. Arguments: filename: The path to write the file to. - compact: A boolean indicating if the JSON string should be compact - by excluding whitespace. + serializer: A MetadataSerializer subclass instance that implements + the desired wireline format serialization. Per default a + JSONSerializer is used. storage_backend: An object that implements securesystemslib.storage.StorageBackendInterface. Per default a (local) FilesystemBackend is used. + Raises: + tuf.api.serialization.SerializationError: + The metadata object cannot be serialized. securesystemslib.exceptions.StorageError: The file cannot be written. """ + if serializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONSerializer + serializer = JSONSerializer(compact=True) + with tempfile.TemporaryFile() as temp_file: - temp_file.write(self.to_json(compact).encode('utf-8')) + temp_file.write(serializer.serialize(self)) persist_temp_file(temp_file, filename, storage_backend) - # Signatures. - def sign(self, key: JsonDict, append: bool = False) -> JsonDict: + def sign( + self, key: Mapping[str, Any], append: bool = False, + signed_serializer: Optional[SignedSerializer] = None + ) -> Dict[str, Any]: """Creates signature over 'signed' and assigns it to 'signatures'. Arguments: @@ -192,9 +188,13 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: append: A boolean indicating if the signature should be appended to the list of signatures or replace any existing signatures. The default behavior is to replace signatures. + signed_serializer: A SignedSerializer subclass instance that + implements the desired canonicalization format. Per default a + CanonicalJSONSerializer is used. Raises: - securesystemslib.exceptions.FormatError: Key argument is malformed. + tuf.api.serialization.SerializationError: + 'signed' cannot be serialized. securesystemslib.exceptions.CryptoError, \ securesystemslib.exceptions.UnsupportedAlgorithmError: Signing errors. @@ -203,7 +203,14 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: A securesystemslib-style signature object. """ - signature = create_signature(key, self.signed.to_canonical_bytes()) + if signed_serializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import CanonicalJSONSerializer + signed_serializer = CanonicalJSONSerializer() + + signature = create_signature(key, + signed_serializer.serialize(self.signed)) if append: self.signatures.append(signature) @@ -212,17 +219,22 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: return signature - - def verify(self, key: JsonDict) -> bool: + def verify(self, key: Mapping[str, Any], + signed_serializer: Optional[SignedSerializer] = None) -> bool: """Verifies 'signatures' over 'signed' that match the passed key by id. Arguments: key: A securesystemslib-style public key object. + signed_serializer: A SignedSerializer subclass instance that + implements the desired canonicalization format. Per default a + CanonicalJSONSerializer is used. Raises: # TODO: Revise exception taxonomy tuf.exceptions.Error: None or multiple signatures found for key. securesystemslib.exceptions.FormatError: Key argument is malformed. + tuf.api.serialization.SerializationError: + 'signed' cannot be serialized. securesystemslib.exceptions.CryptoError, \ securesystemslib.exceptions.UnsupportedAlgorithmError: Signing errors. @@ -243,10 +255,15 @@ def verify(self, key: JsonDict) -> bool: f'{len(signatures_for_keyid)} signatures for key ' f'{key["keyid"]}, not sure which one to verify.') + if signed_serializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import CanonicalJSONSerializer + signed_serializer = CanonicalJSONSerializer() + return verify_signature( key, signatures_for_keyid[0], - self.signed.to_canonical_bytes()) - + signed_serializer.serialize(self.signed)) class Signed: @@ -263,12 +280,10 @@ class Signed: metadata format adheres to. expires: The metadata expiration datetime object. - """ # NOTE: Signed is a stupid name, because this might not be signed yet, but # we keep it to match spec terminology (I often refer to this as "payload", # or "inner metadata") - def __init__( self, _type: str, version: int, spec_version: str, expires: datetime) -> None: @@ -283,38 +298,31 @@ def __init__( raise ValueError(f'version must be < 0, got {version}') self.version = version + @staticmethod + def _common_fields_from_dict(signed_dict: Mapping[str, Any]) -> list: + """Returns common fields of 'Signed' instances from the passed dict + representation, and returns an ordered list to be passed as leading + positional arguments to a subclass constructor. - # Deserialization (factories). - @classmethod - def from_dict(cls, signed_dict: JsonDict) -> 'Signed': - """Creates Signed object from its JSON/dict representation. """ + See '{Root, Timestamp, Snapshot, Targets}.from_dict' methods for usage. + """ + _type = signed_dict.pop('_type') + version = signed_dict.pop('version') + spec_version = signed_dict.pop('spec_version') + expires_str = signed_dict.pop('expires') # Convert 'expires' TUF metadata string to a datetime object, which is # what the constructor expects and what we store. The inverse operation - # is implemented in 'to_dict'. - signed_dict['expires'] = tuf.formats.expiry_string_to_datetime( - signed_dict['expires']) - # NOTE: We write the converted 'expires' back into 'signed_dict' above - # so that we can pass it to the constructor as '**signed_dict' below, - # along with other fields that belong to Signed subclasses. - # Any 'from_dict'(-like) conversions of fields that correspond to a - # subclass should be performed in the 'from_dict' method of that - # subclass and also be written back into 'signed_dict' before calling - # super().from_dict. - - # NOTE: cls might be a subclass of Signed, if 'from_dict' was called on - # that subclass (see e.g. Metadata.from_dict). - return cls(**signed_dict) - - - # Serialization. - def to_canonical_bytes(self) -> bytes: - """Returns the UTF-8 encoded canonical JSON representation of self. """ - return encode_canonical(self.to_dict()).encode('UTF-8') - - - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ + # is implemented in '_common_fields_to_dict'. + expires = tuf.formats.expiry_string_to_datetime(expires_str) + return [_type, version, spec_version, expires] + + def _common_fields_to_dict(self) -> Dict[str, Any]: + """Returns dict representation of common fields of 'Signed' instances. + + See '{Root, Timestamp, Snapshot, Targets}.to_dict' methods for usage. + + """ return { '_type': self._type, 'version': self.version, @@ -322,13 +330,11 @@ def to_dict(self) -> JsonDict: 'expires': self.expires.isoformat() + 'Z' } - # Modification. def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: """Increments the expires attribute by the passed timedelta. """ self.expires += delta - def bump_version(self) -> None: """Increments the metadata version number by 1.""" self.version += 1 @@ -375,34 +381,40 @@ class Root(Signed): def __init__( self, _type: str, version: int, spec_version: str, expires: datetime, consistent_snapshot: bool, - keys: JsonDict, roles: JsonDict) -> None: + keys: Mapping[str, Any], roles: Mapping[str, Any]) -> None: super().__init__(_type, version, spec_version, expires) # TODO: Add classes for keys and roles self.consistent_snapshot = consistent_snapshot self.keys = keys self.roles = roles - - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ + @classmethod + def from_dict(cls, root_dict: Mapping[str, Any]) -> 'Root': + """Creates Root object from its dict representation. """ + common_args = cls._common_fields_from_dict(root_dict) + consistent_snapshot = root_dict.pop('consistent_snapshot') + keys = root_dict.pop('keys') + roles = root_dict.pop('roles') + return cls(*common_args, consistent_snapshot, keys, roles) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self. """ + root_dict = self._common_fields_to_dict() + root_dict.update({ 'consistent_snapshot': self.consistent_snapshot, 'keys': self.keys, 'roles': self.roles }) - return json_dict - + return root_dict # Update key for a role. - def add_key(self, role: str, keyid: str, key_metadata: JsonDict) -> None: + def add_key(self, role: str, keyid: str, + key_metadata: Mapping[str, Any]) -> None: """Adds new key for 'role' and updates the key store. """ if keyid not in self.roles[role]['keyids']: self.roles[role]['keyids'].append(keyid) self.keys[keyid] = key_metadata - # Remove key for a role. def remove_key(self, role: str, keyid: str) -> None: """Removes key for 'role' and updates the key store. """ @@ -415,8 +427,6 @@ def remove_key(self, role: str, keyid: str) -> None: del self.keys[keyid] - - class Timestamp(Signed): """A container for the signed part of timestamp metadata. @@ -438,24 +448,29 @@ class Timestamp(Signed): """ def __init__( self, _type: str, version: int, spec_version: str, - expires: datetime, meta: JsonDict) -> None: + expires: datetime, meta: Mapping[str, Any]) -> None: super().__init__(_type, version, spec_version, expires) # TODO: Add class for meta self.meta = meta - - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ + @classmethod + def from_dict(cls, timestamp_dict: Mapping[str, Any]) -> 'Timestamp': + """Creates Timestamp object from its dict representation. """ + common_args = cls._common_fields_from_dict(timestamp_dict) + meta = timestamp_dict.pop('meta') + return cls(*common_args, meta) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self. """ + timestamp_dict = self._common_fields_to_dict() + timestamp_dict.update({ 'meta': self.meta }) - return json_dict - + return timestamp_dict # Modification. - def update(self, version: int, length: int, hashes: JsonDict) -> None: + def update(self, version: int, length: int, + hashes: Mapping[str, Any]) -> None: """Assigns passed info about snapshot metadata to meta dict. """ self.meta['snapshot.json'] = { 'version': version, @@ -492,25 +507,30 @@ class Snapshot(Signed): """ def __init__( self, _type: str, version: int, spec_version: str, - expires: datetime, meta: JsonDict) -> None: + expires: datetime, meta: Mapping[str, Any]) -> None: super().__init__(_type, version, spec_version, expires) # TODO: Add class for meta self.meta = meta - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ + @classmethod + def from_dict(cls, snapshot_dict: Mapping[str, Any]) -> 'Snapshot': + """Creates Snapshot object from its dict representation. """ + common_args = cls._common_fields_from_dict(snapshot_dict) + meta = snapshot_dict.pop('meta') + return cls(*common_args, meta) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self. """ + snapshot_dict = self._common_fields_to_dict() + snapshot_dict.update({ 'meta': self.meta }) - return json_dict - + return snapshot_dict # Modification. def update( self, rolename: str, version: int, length: Optional[int] = None, - hashes: Optional[JsonDict] = None) -> None: + hashes: Optional[Mapping[str, Any]] = None) -> None: """Assigns passed (delegated) targets role info to meta dict. """ metadata_fn = f'{rolename}.json' @@ -580,26 +600,33 @@ class Targets(Signed): # default max-args value for pylint is 5 # pylint: disable=too-many-arguments def __init__( - self, _type: str, version: int, spec_version: str, - expires: datetime, targets: JsonDict, delegations: JsonDict - ) -> None: + self, _type: str, version: int, spec_version: str, + expires: datetime, targets: Mapping[str, Any], + delegations: Mapping[str, Any] + ) -> None: super().__init__(_type, version, spec_version, expires) # TODO: Add class for meta self.targets = targets self.delegations = delegations - - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ + @classmethod + def from_dict(cls, targets_dict: Mapping[str, Any]) -> 'Targets': + """Creates Targets object from its dict representation. """ + common_args = cls._common_fields_from_dict(targets_dict) + targets = targets_dict.pop('targets') + delegations = targets_dict.pop('delegations') + return cls(*common_args, targets, delegations) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self. """ + targets_dict = self._common_fields_to_dict() + targets_dict.update({ 'targets': self.targets, 'delegations': self.delegations, }) - return json_dict + return targets_dict # Modification. - def update(self, filename: str, fileinfo: JsonDict) -> None: + def update(self, filename: str, fileinfo: Mapping[str, Any]) -> None: """Assigns passed target file info to meta dict. """ self.targets[filename] = fileinfo diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index a75347f446..badef7613d 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -1,6 +1,12 @@ [MESSAGE_CONTROL] disable=fixme +[BASIC] +good-names=e + [FORMAT] indent-string=" " max-line-length=79 + +[DESIGN] +min-public-methods=0 diff --git a/tuf/api/serialization/__init__.py b/tuf/api/serialization/__init__.py new file mode 100644 index 0000000000..ed3191a103 --- /dev/null +++ b/tuf/api/serialization/__init__.py @@ -0,0 +1,53 @@ +# Copyright New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""TUF role metadata de/serialization. + +This sub-package provides abstract base classes and concrete implementations to +serialize and deserialize TUF role metadata and metadata parts. + +Any custom de/serialization implementations should inherit from the abstract +base classes defined in this __init__.py module. + +- Metadata de/serializers are used to convert to and from wireline formats. +- Signed serializers are used to canonicalize data for cryptographic signatures + generation and verification. + +""" +import abc + + +# TODO: Should these be in tuf.exceptions or inherit from tuf.exceptions.Error? +class SerializationError(Exception): + """Error during serialization. """ + + +class DeserializationError(Exception): + """Error during deserialization. """ + + +class MetadataDeserializer(metaclass=abc.ABCMeta): + """Abstract base class for deserialization of Metadata objects. """ + + @abc.abstractmethod + def deserialize(self, raw_data: bytes) -> "Metadata": + """Deserialize passed bytes to Metadata object. """ + raise NotImplementedError + + +class MetadataSerializer(metaclass=abc.ABCMeta): + """Abstract base class for serialization of Metadata objects. """ + + @abc.abstractmethod + def serialize(self, metadata_obj: "Metadata") -> bytes: + """Serialize passed Metadata object to bytes. """ + raise NotImplementedError + + +class SignedSerializer(metaclass=abc.ABCMeta): + """Abstract base class for serialization of Signed objects. """ + + @abc.abstractmethod + def serialize(self, signed_obj: "Signed") -> bytes: + """Serialize passed Signed object to bytes. """ + raise NotImplementedError diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py new file mode 100644 index 0000000000..215a0ad790 --- /dev/null +++ b/tuf/api/serialization/json.py @@ -0,0 +1,84 @@ +# Copyright New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""TUF role metadata JSON serialization and deserialization. + +This module provides concrete implementations to serialize and deserialize TUF +role metadata to and from the JSON wireline format for transportation, and +to serialize the 'signed' part of TUF role metadata to the OLPC Canonical JSON +format for signature generation and verification. + +""" +import json + +from securesystemslib.formats import encode_canonical + +# pylint: disable=cyclic-import +# ... to allow de/serializing Metadata and Signed objects here, while also +# creating default de/serializers there (see metadata local scope imports). +# NOTE: A less desirable alternative would be to add more abstraction layers. +from tuf.api.metadata import Metadata, Signed +from tuf.api.serialization import (MetadataSerializer, + MetadataDeserializer, + SignedSerializer, + SerializationError, + DeserializationError) + + +class JSONDeserializer(MetadataDeserializer): + """Provides JSON to Metadata deserialize method. """ + + def deserialize(self, raw_data: bytes) -> Metadata: + """Deserialize utf-8 encoded JSON bytes into Metadata object. """ + try: + json_dict = json.loads(raw_data.decode("utf-8")) + metadata_obj = Metadata.from_dict(json_dict) + + except Exception as e: # pylint: disable=broad-except + raise DeserializationError from e + + return metadata_obj + + +class JSONSerializer(MetadataSerializer): + """Provides Metadata to JSON serialize method. + + Attributes: + compact: A boolean indicating if the JSON bytes generated in + 'serialize' should be compact by excluding whitespace. + + """ + def __init__(self, compact: bool = False) -> None: + self.compact = compact + + def serialize(self, metadata_obj: Metadata) -> bytes: + """Serialize Metadata object into utf-8 encoded JSON bytes. """ + try: + indent = (None if self.compact else 1) + separators = ((',', ':') if self.compact else (',', ': ')) + json_bytes = json.dumps(metadata_obj.to_dict(), + indent=indent, + separators=separators, + sort_keys=True).encode("utf-8") + + except Exception as e: # pylint: disable=broad-except + raise SerializationError from e + + return json_bytes + + +class CanonicalJSONSerializer(SignedSerializer): + """Provides Signed to OLPC Canonical JSON serialize method. """ + + def serialize(self, signed_obj: Signed) -> bytes: + """Serialize Signed object into utf-8 encoded OLPC Canonical JSON + bytes. + """ + try: + signed_dict = signed_obj.to_dict() + canonical_bytes = encode_canonical(signed_dict).encode("utf-8") + + except Exception as e: # pylint: disable=broad-except + raise SerializationError from e + + return canonical_bytes