Skip to content

Commit 0fbfb0f

Browse files
committed
Separate TUF and trusted root management code
The purpose of this is to later enable both "--trust-root <FILE>" and some sort of offline functionality. * Trusted root can now be initialized from tuf, offline tuf or from a file * _internal.tuf module is now used only from the new trustroot module * Tests are modified to use the CustomTrustRoot API now but they still (also) test the internal TUF implementation details * The new functionality (offline & from_file) is tested but is not exposed to UI Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
1 parent 0883808 commit 0fbfb0f

File tree

9 files changed

+287
-221
lines changed

9 files changed

+287
-221
lines changed

sigstore/_cli.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
RekorClient,
4141
RekorKeyring,
4242
)
43-
from sigstore._internal.tuf import TrustUpdater
43+
from sigstore._internal.trustroot import CustomTrustedRoot
4444
from sigstore._utils import PEMCert
4545
from sigstore.errors import Error
4646
from sigstore.oidc import (
@@ -650,16 +650,16 @@ def _sign(args: argparse.Namespace) -> None:
650650
elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL:
651651
signing_ctx = SigningContext.production()
652652
else:
653-
# Assume "production" keys if none are given as arguments
654-
updater = TrustUpdater.production()
653+
# Assume "production" trust root if no keys are given as arguments
654+
trusted_root = CustomTrustedRoot.production()
655655
if args.ctfe_pem is not None:
656656
ctfe_keys = [args.ctfe_pem.read()]
657657
else:
658-
ctfe_keys = updater.get_ctfe_keys()
658+
ctfe_keys = trusted_root.get_ctfe_keys()
659659
if args.rekor_root_pubkey is not None:
660660
rekor_keys = [args.rekor_root_pubkey.read()]
661661
else:
662-
rekor_keys = updater.get_rekor_keys()
662+
rekor_keys = trusted_root.get_rekor_keys()
663663

664664
ct_keyring = CTKeyring(Keyring(ctfe_keys))
665665
rekor_keyring = RekorKeyring(Keyring(rekor_keys))
@@ -828,8 +828,8 @@ def _collect_verification_state(
828828
if args.rekor_root_pubkey is not None:
829829
rekor_keys = [args.rekor_root_pubkey.read()]
830830
else:
831-
updater = TrustUpdater.production()
832-
rekor_keys = updater.get_rekor_keys()
831+
trusted_root = CustomTrustedRoot.production()
832+
rekor_keys = trusted_root.get_rekor_keys()
833833

834834
verifier = Verifier(
835835
rekor=RekorClient(

sigstore/_internal/rekor/client.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from sigstore._internal.ctfe import CTKeyring
3131
from sigstore._internal.keyring import Keyring
32-
from sigstore._internal.tuf import TrustUpdater
32+
from sigstore._internal.trustroot import CustomTrustedRoot
3333
from sigstore.transparency import LogEntry
3434

3535
logger = logging.getLogger(__name__)
@@ -232,14 +232,14 @@ def __del__(self) -> None:
232232
self.session.close()
233233

234234
@classmethod
235-
def production(cls, updater: TrustUpdater) -> RekorClient:
235+
def production(cls, trust_root: CustomTrustedRoot) -> RekorClient:
236236
"""
237237
Returns a `RekorClient` populated with the default Rekor production instance.
238238
239-
updater must be a `TrustUpdater` for the production TUF repository.
239+
trust_root must be a `CustomTrustedRoot` for the production TUF repository.
240240
"""
241-
rekor_keys = updater.get_rekor_keys()
242-
ctfe_keys = updater.get_ctfe_keys()
241+
rekor_keys = trust_root.get_rekor_keys()
242+
ctfe_keys = trust_root.get_ctfe_keys()
243243

244244
return cls(
245245
DEFAULT_REKOR_URL,
@@ -248,14 +248,14 @@ def production(cls, updater: TrustUpdater) -> RekorClient:
248248
)
249249

250250
@classmethod
251-
def staging(cls, updater: TrustUpdater) -> RekorClient:
251+
def staging(cls, trust_root: CustomTrustedRoot) -> RekorClient:
252252
"""
253253
Returns a `RekorClient` populated with the default Rekor staging instance.
254254
255-
updater must be a `TrustUpdater` for the staging TUF repository.
255+
trust_root must be a `CustomTrustedRoot` for the staging TUF repository.
256256
"""
257-
rekor_keys = updater.get_rekor_keys()
258-
ctfe_keys = updater.get_ctfe_keys()
257+
rekor_keys = trust_root.get_rekor_keys()
258+
ctfe_keys = trust_root.get_ctfe_keys()
259259

260260
return cls(
261261
STAGING_REKOR_URL,

sigstore/_internal/trustroot.py

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2023 The Sigstore Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Trust root management for sigstore-python.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
from datetime import datetime, timezone
22+
from pathlib import Path
23+
from typing import Iterable
24+
25+
from cryptography.x509 import Certificate, load_der_x509_certificate
26+
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
27+
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
28+
CertificateAuthority,
29+
TransparencyLogInstance,
30+
TrustedRoot,
31+
)
32+
33+
from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
34+
from sigstore.errors import MetadataError
35+
36+
37+
def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
38+
"""
39+
Given a `period`, checks that the the current time is not before `start`. If
40+
`allow_expired` is `False`, also checks that the current time is not after
41+
`end`.
42+
"""
43+
now = datetime.now(timezone.utc)
44+
45+
# If there was no validity period specified, the key is always valid.
46+
if not period:
47+
return True
48+
49+
# Active: if the current time is before the starting period, we are not yet
50+
# valid.
51+
if now < period.start:
52+
return False
53+
54+
# If we want Expired keys, the key is valid at this point. Otherwise, check
55+
# that we are within range.
56+
return allow_expired or (period.end is None or now <= period.end)
57+
58+
59+
class CustomTrustedRoot(TrustedRoot):
60+
"""Complete set of trusted entities for a Sigstore client"""
61+
62+
@classmethod
63+
def from_file(cls, path: str) -> "CustomTrustedRoot":
64+
"""Create a new trust root from file"""
65+
tr: CustomTrustedRoot = cls().from_json(Path(path).read_bytes())
66+
return tr
67+
68+
@classmethod
69+
def from_tuf(cls, url: str, offline: bool = False) -> "CustomTrustedRoot":
70+
"""Create a new trust root from a TUF repository.
71+
72+
If `offline`, will use trust root in local TUF cache. Otherwise will
73+
update the trust root from remote TUF repository.
74+
"""
75+
path = TrustUpdater(url, offline).get_trusted_root_path()
76+
return cls.from_file(path)
77+
78+
@classmethod
79+
def production(cls, offline: bool = False) -> "CustomTrustedRoot":
80+
"""Create new trust root from Sigstore production TUF repository.
81+
82+
If `offline`, will use trust root in local TUF cache. Otherwise will
83+
update the trust root from remote TUF repository.
84+
"""
85+
return cls.from_tuf(DEFAULT_TUF_URL, offline)
86+
87+
@classmethod
88+
def staging(cls, offline: bool = False) -> "CustomTrustedRoot":
89+
"""Create new trust root from Sigstore staging TUF repository.
90+
91+
If `offline`, will use trust root in local TUF cache. Otherwise will
92+
update the trust root from remote TUF repository.
93+
"""
94+
return cls.from_tuf(STAGING_TUF_URL, offline)
95+
96+
@staticmethod
97+
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
98+
"""Return public key contents given transparency log instances."""
99+
100+
for key in tlogs:
101+
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False):
102+
continue
103+
key_bytes = key.public_key.raw_bytes
104+
if key_bytes:
105+
yield key_bytes
106+
107+
@staticmethod
108+
def _get_ca_keys(
109+
cas: list[CertificateAuthority], *, allow_expired: bool
110+
) -> Iterable[bytes]:
111+
"""Return public key contents given certificate authorities."""
112+
113+
for ca in cas:
114+
if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired):
115+
continue
116+
for cert in ca.cert_chain.certificates:
117+
yield cert.raw_bytes
118+
119+
def get_ctfe_keys(self) -> list[bytes]:
120+
"""Return the active CTFE public keys contents."""
121+
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
122+
if not ctfes:
123+
raise MetadataError("Active CTFE keys not found in trusted root")
124+
return ctfes
125+
126+
def get_rekor_keys(self) -> list[bytes]:
127+
"""Return the rekor public key content."""
128+
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))
129+
130+
if len(keys) != 1:
131+
raise MetadataError("Did not find one active Rekor key in trusted root")
132+
return keys
133+
134+
def get_fulcio_certs(self) -> list[Certificate]:
135+
"""Return the Fulcio certificates."""
136+
137+
certs: list[Certificate]
138+
139+
# Return expired certificates too: they are expired now but may have
140+
# been active when the certificate was used to sign.
141+
certs = [
142+
load_der_x509_certificate(c)
143+
for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True)
144+
]
145+
146+
if not certs:
147+
raise MetadataError("Fulcio certificates not found in trusted root")
148+
return certs

0 commit comments

Comments
 (0)