Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple TUF role metadata model #1112

Merged
merged 16 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
#!/usr/bin/env python

# Copyright 2020, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
""" Unit tests for api/metadata.py

"""

import json
import sys
import logging
import os
import shutil
import tempfile
import unittest

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

# TODO: Remove case handling when fully dropping support for versions >= 3.6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: Remove case handling when fully dropping support for versions >= 3.6
# TODO: Remove case handling when fully dropping support for versions < 3.6

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 thanks for catching this!

IS_PY_VERSION_SUPPORTED = sys.version_info >= (3, 6)

# Use setUpModule to tell unittest runner to skip this test module gracefully.
def setUpModule():
if not IS_PY_VERSION_SUPPORTED:
raise unittest.SkipTest('requires Python 3.6 or higher')

# Since setUpModule is called after imports we need to import conditionally.
if IS_PY_VERSION_SUPPORTED:
import tuf.exceptions
from tuf.api.metadata import (
Metadata,
Snapshot,
Timestamp,
Targets
)

from securesystemslib.interface import (
import_ed25519_publickey_from_file,
import_ed25519_privatekey_from_file
)

logger = logging.getLogger(__name__)


class TestMetadata(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in
# TearDownClass() so that temporary files are always removed, even when
# exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())

test_repo_data = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'repository_data')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 space indentation on a continuation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Good question. I always thought PEP 8 required double-indentation for hanging indent, but it is only a suggestion, and also only to distinguish it from a subsequent nested block.

IMO it's a good idea to always differentiate line continuation indentation from functional indentation, even if there is no subsequent nested block. But I'm fine either way.

FYI, the google style guide does not say anything about double indent but has some examples that could be interpreted that way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's a good idea to always differentiate line continuation indentation from functional indentation, even if there is no subsequent nested block. But I'm fine either way.

That seems like a reasonable coding style for us to adopt. We should document it and teach the linter about it 😃


cls.repo_dir = os.path.join(cls.temporary_directory, 'repository')
shutil.copytree(
os.path.join(test_repo_data, 'repository'), cls.repo_dir)

cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore')
shutil.copytree(
os.path.join(test_repo_data, 'keystore'), cls.keystore_dir)

# Load keys into memory
cls.keystore = {}
for role in ['delegation', 'snapshot', 'targets', 'timestamp']:
cls.keystore[role] = {
'private': import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + '_key'),
password="password"),
'public': import_ed25519_publickey_from_file(
os.path.join(cls.keystore_dir, role + '_key.pub'))
}


@classmethod
def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all
# the metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)


def test_generic_read(self):
for metadata, inner_metadata_cls in [
('snapshot', Snapshot),
('timestamp', Timestamp),
('targets', Targets)]:

# Load JSON-formatted metdata of each supported type from file
# and from out-of-band read JSON string
path = os.path.join(self.repo_dir, 'metadata', metadata + '.json')
metadata_obj = Metadata.from_json_file(path)
with open(path, 'rb') as f:
metadata_str = f.read()
metadata_obj2 = Metadata.from_json(metadata_str)

# Assert that both methods instantiate the right inner class for
# each metadata type and ...
self.assertTrue(
isinstance(metadata_obj.signed, inner_metadata_cls))
self.assertTrue(
isinstance(metadata_obj2.signed, inner_metadata_cls))

# ... and return the same object (compared by dict representation)
self.assertDictEqual(
metadata_obj.to_dict(), metadata_obj2.to_dict())


# Assert that it chokes correctly on an unknown metadata type
bad_metadata_path = 'bad-metadata.json'
bad_metadata = {'signed': {'_type': 'bad-metadata'}}
with open(bad_metadata_path, 'wb') as f:
f.write(json.dumps(bad_metadata).encode('utf-8'))

with self.assertRaises(ValueError):
Metadata.from_json_file(bad_metadata_path)

os.remove(bad_metadata_path)


def test_compact_json(self):
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
metadata_obj = Metadata.from_json_file(path)
self.assertTrue(
len(metadata_obj.to_json(compact=True)) <
len(metadata_obj.to_json()))


def test_read_write_read_compare(self):
for metadata in ['snapshot', 'timestamp', 'targets']:
path = os.path.join(self.repo_dir, 'metadata', metadata + '.json')
metadata_obj = Metadata.from_json_file(path)

path_2 = path + '.tmp'
metadata_obj.to_json_file(path_2)
metadata_obj_2 = Metadata.from_json_file(path_2)

self.assertDictEqual(
metadata_obj.to_dict(),
metadata_obj_2.to_dict())

os.remove(path_2)


def test_sign_verify(self):
# Load sample metadata (targets) and assert ...
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
metadata_obj = Metadata.from_json_file(path)

# ... it has a single existing signature,
self.assertTrue(len(metadata_obj.signatures) == 1)
# ... which is valid for the correct key.
self.assertTrue(metadata_obj.verify(
self.keystore['targets']['public']))

# Append a new signature with the unrelated key and assert that ...
metadata_obj.sign(self.keystore['snapshot']['private'], append=True)
# ... there are now two signatures, and
self.assertTrue(len(metadata_obj.signatures) == 2)
# ... both are valid for the corresponding keys.
self.assertTrue(metadata_obj.verify(
self.keystore['targets']['public']))
self.assertTrue(metadata_obj.verify(
self.keystore['snapshot']['public']))

# Create and assign (don't append) a new signature and assert that ...
metadata_obj.sign(self.keystore['timestamp']['private'], append=False)
# ... there now is only one signature,
self.assertTrue(len(metadata_obj.signatures) == 1)
# ... valid for that key.
self.assertTrue(metadata_obj.verify(
self.keystore['timestamp']['public']))

# Assert exception if there are more than one signatures for a key
metadata_obj.sign(self.keystore['timestamp']['private'], append=True)
with self.assertRaises(tuf.exceptions.Error) as ctx:
metadata_obj.verify(self.keystore['timestamp']['public'])
self.assertTrue(
'2 signatures for key' in str(ctx.exception),
str(ctx.exception))

# Assert exception if there is no signature for a key
with self.assertRaises(tuf.exceptions.Error) as ctx:
metadata_obj.verify(self.keystore['targets']['public'])
self.assertTrue(
'no signature for' in str(ctx.exception),
str(ctx.exception))


def test_metadata_base(self):
# Use of Snapshot is arbitrary, we're just testing the base class features
# with real data
snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')
md = Metadata.from_json_file(snapshot_path)

self.assertEqual(md.signed.version, 1)
md.signed.bump_version()
self.assertEqual(md.signed.version, 2)
self.assertEqual(md.signed.expires, datetime(2030, 1, 1, 0, 0))
md.signed.bump_expiration()
self.assertEqual(md.signed.expires, datetime(2030, 1, 2, 0, 0))
md.signed.bump_expiration(timedelta(days=365))
self.assertEqual(md.signed.expires, datetime(2031, 1, 2, 0, 0))


def test_metadata_snapshot(self):
snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')
snapshot = Metadata.from_json_file(snapshot_path)

# Create a dict representing what we expect the updated data to be
fileinfo = snapshot.signed.meta
hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
fileinfo['role1.json']['version'] = 2
fileinfo['role1.json']['hashes'] = hashes
fileinfo['role1.json']['length'] = 123

snapshot.signed.update('role1', 2, 123, hashes)
self.assertEqual(snapshot.signed.meta, fileinfo)


def test_metadata_timestamp(self):
timestamp_path = os.path.join(
self.repo_dir, 'metadata', 'timestamp.json')
timestamp = Metadata.from_json_file(timestamp_path)

self.assertEqual(timestamp.signed.version, 1)
timestamp.signed.bump_version()
self.assertEqual(timestamp.signed.version, 2)

self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 1, 0, 0))
timestamp.signed.bump_expiration()
self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 2, 0, 0))
timestamp.signed.bump_expiration(timedelta(days=365))
self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 2, 0, 0))

# Test whether dateutil.relativedelta works, this provides a much
# easier to use interface for callers
delta = relativedelta(days=1)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 3, 0, 0))
delta = relativedelta(years=5)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0))

hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'}
fileinfo = timestamp.signed.meta['snapshot.json']
fileinfo['hashes'] = hashes
fileinfo['version'] = 2
fileinfo['length'] = 520
timestamp.signed.update(2, 520, hashes)
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)


# Run unit test.
if __name__ == '__main__':
unittest.main()
Loading