Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate TUF and trusted root management code #844

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
RekorClient,
RekorKeyring,
)
from sigstore._internal.tuf import TrustUpdater
from sigstore._internal.trustroot import TrustedRoot
from sigstore._utils import PEMCert
from sigstore.errors import Error
from sigstore.oidc import (
Expand Down Expand Up @@ -650,16 +650,16 @@ def _sign(args: argparse.Namespace) -> None:
elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL:
signing_ctx = SigningContext.production()
else:
# Assume "production" keys if none are given as arguments
updater = TrustUpdater.production()
# Assume "production" trust root if no keys are given as arguments
trusted_root = TrustedRoot.production()
if args.ctfe_pem is not None:
ctfe_keys = [args.ctfe_pem.read()]
else:
ctfe_keys = updater.get_ctfe_keys()
ctfe_keys = trusted_root.get_ctfe_keys()
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
rekor_keys = updater.get_rekor_keys()
rekor_keys = trusted_root.get_rekor_keys()

ct_keyring = CTKeyring(Keyring(ctfe_keys))
rekor_keyring = RekorKeyring(Keyring(rekor_keys))
Expand Down Expand Up @@ -828,8 +828,8 @@ def _collect_verification_state(
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
updater = TrustUpdater.production()
rekor_keys = updater.get_rekor_keys()
trusted_root = TrustedRoot.production()
rekor_keys = trusted_root.get_rekor_keys()

verifier = Verifier(
rekor=RekorClient(
Expand Down
18 changes: 9 additions & 9 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.tuf import TrustUpdater
from sigstore._internal.trustroot import TrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -232,14 +232,14 @@ def __del__(self) -> None:
self.session.close()

@classmethod
def production(cls, updater: TrustUpdater) -> RekorClient:
def production(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor production instance.

updater must be a `TrustUpdater` for the production TUF repository.
trust_root must be a `TrustedRoot` for the production TUF repository.
"""
rekor_keys = updater.get_rekor_keys()
ctfe_keys = updater.get_ctfe_keys()
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
DEFAULT_REKOR_URL,
Expand All @@ -248,14 +248,14 @@ def production(cls, updater: TrustUpdater) -> RekorClient:
)

@classmethod
def staging(cls, updater: TrustUpdater) -> RekorClient:
def staging(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor staging instance.

updater must be a `TrustUpdater` for the staging TUF repository.
trust_root must be a `TrustedRoot` for the staging TUF repository.
"""
rekor_keys = updater.get_rekor_keys()
ctfe_keys = updater.get_ctfe_keys()
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
STAGING_REKOR_URL,
Expand Down
150 changes: 150 additions & 0 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2023 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trust root management for sigstore-python.
"""

from __future__ import annotations

from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable

from cryptography.x509 import Certificate, load_der_x509_certificate
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
CertificateAuthority,
TransparencyLogInstance,
)
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
TrustedRoot as _TrustedRoot,
)

from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
from sigstore.errors import MetadataError


def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
"""
Given a `period`, checks that the the current time is not before `start`. If
`allow_expired` is `False`, also checks that the current time is not after
`end`.
"""
now = datetime.now(timezone.utc)

# If there was no validity period specified, the key is always valid.
if not period:
return True

# Active: if the current time is before the starting period, we are not yet
# valid.
if now < period.start:
return False

# If we want Expired keys, the key is valid at this point. Otherwise, check
# that we are within range.
return allow_expired or (period.end is None or now <= period.end)


class TrustedRoot(_TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

@classmethod
def from_file(cls, path: str) -> "TrustedRoot":
"""Create a new trust root from file"""
tr: TrustedRoot = cls().from_json(Path(path).read_bytes())
return tr

@classmethod
def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot":
"""Create a new trust root from a TUF repository.

If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
path = TrustUpdater(url, offline).get_trusted_root_path()
return cls.from_file(path)

@classmethod
def production(cls, offline: bool = False) -> "TrustedRoot":
"""Create new trust root from Sigstore production TUF repository.

If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(DEFAULT_TUF_URL, offline)

@classmethod
def staging(cls, offline: bool = False) -> "TrustedRoot":
"""Create new trust root from Sigstore staging TUF repository.

If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(STAGING_TUF_URL, offline)

@staticmethod
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
"""Return public key contents given transparency log instances."""

for key in tlogs:
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False):
continue
key_bytes = key.public_key.raw_bytes
if key_bytes:
yield key_bytes

@staticmethod
def _get_ca_keys(
cas: list[CertificateAuthority], *, allow_expired: bool
) -> Iterable[bytes]:
"""Return public key contents given certificate authorities."""

for ca in cas:
if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired):
continue
for cert in ca.cert_chain.certificates:
yield cert.raw_bytes

def get_ctfe_keys(self) -> list[bytes]:
"""Return the active CTFE public keys contents."""
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
if not ctfes:
raise MetadataError("Active CTFE keys not found in trusted root")
return ctfes

def get_rekor_keys(self) -> list[bytes]:
"""Return the rekor public key content."""
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))

if len(keys) != 1:
raise MetadataError("Did not find one active Rekor key in trusted root")
return keys

def get_fulcio_certs(self) -> list[Certificate]:
"""Return the Fulcio certificates."""

certs: list[Certificate]

# Return expired certificates too: they are expired now but may have
# been active when the certificate was used to sign.
certs = [
load_der_x509_certificate(c)
for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True)
]

if not certs:
raise MetadataError("Fulcio certificates not found in trusted root")
return certs
Loading