Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `Attestation.sign` now only returns `AttestationError` when failing to sign a distribution file ([#28](https://github.com/trailofbits/pypi-attestations/pull/28))

## [0.0.6]

### Added
Expand Down
62 changes: 41 additions & 21 deletions src/pypi_attestations/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
from annotated_types import MinLen # noqa: TCH002
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from packaging.utils import parse_sdist_filename, parse_wheel_filename
from packaging.utils import (
InvalidSdistFilename,
InvalidWheelFilename,
parse_sdist_filename,
parse_wheel_filename,
)
from pydantic import Base64Bytes, BaseModel
from pydantic_core import ValidationError
from sigstore._utils import _sha256_streaming
from sigstore.dsse import Envelope as DsseEnvelope
from sigstore.dsse import Error as DsseError
from sigstore.dsse import _DigestSet, _Statement, _StatementBuilder, _Subject
from sigstore.models import Bundle, LogEntry
from sigstore.sign import ExpiredCertificate, ExpiredIdentity
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature as _Signature

Expand Down Expand Up @@ -86,34 +93,47 @@ class Attestation(BaseModel):
def sign(cls, signer: Signer, dist: Path) -> Attestation:
"""Create an envelope, with signature, from a distribution file.

On failure, raises `AttestationError` or an appropriate subclass.
On failure, raises `AttestationError`.
"""
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()
try:
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()
except OSError as e:
raise AttestationError(str(e))

try:
name = _ultranormalize_dist_filename(dist.name)
except ValueError as e:
except (ValueError, InvalidWheelFilename, InvalidSdistFilename) as e:
raise AttestationError(str(e))

stmt = (
_StatementBuilder()
.subjects(
[
_Subject(
name=name,
digest=_DigestSet(root={"sha256": digest}),
)
]
try:
stmt = (
_StatementBuilder()
.subjects(
[
_Subject(
name=name,
digest=_DigestSet(root={"sha256": digest}),
)
]
)
.predicate_type("https://docs.pypi.org/attestations/publish/v1")
.build()
)
.predicate_type("https://docs.pypi.org/attestations/publish/v1")
.build()
)
bundle = signer.sign_dsse(stmt)
except DsseError as e:
raise AttestationError(str(e))

return Attestation.from_bundle(bundle)
try:
bundle = signer.sign_dsse(stmt)
except (ExpiredCertificate, ExpiredIdentity) as e:
raise AttestationError(str(e))

try:
return Attestation.from_bundle(bundle)
except ConversionError as e:
raise AttestationError(str(e))

def verify(
self, verifier: Verifier, policy: VerificationPolicy, dist: Path
Expand Down
62 changes: 62 additions & 0 deletions test/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pretend
import pypi_attestations._impl as impl
import pytest
import sigstore
from sigstore.dsse import _DigestSet, _StatementBuilder, _Subject
from sigstore.models import Bundle
from sigstore.oidc import IdentityToken
Expand Down Expand Up @@ -57,6 +58,67 @@ def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None:
):
impl.Attestation.sign(pretend.stub(), bad_dist)

def test_sign_raises_attestation_exception(
self, id_token: IdentityToken, tmp_path: Path
) -> None:
non_existing_file = tmp_path / "invalid-name.tar.gz"
with pytest.raises(impl.AttestationError, match="No such file"):
impl.Attestation.sign(pretend.stub(), non_existing_file)

bad_wheel_filename = tmp_path / "invalid-name.whl"
bad_wheel_filename.write_bytes(b"junk")

with pytest.raises(impl.AttestationError, match="Invalid wheel filename"):
impl.Attestation.sign(pretend.stub(), bad_wheel_filename)

bad_sdist_filename = tmp_path / "invalid_name.tar.gz"
bad_sdist_filename.write_bytes(b"junk")

with pytest.raises(impl.AttestationError, match="Invalid sdist filename"):
impl.Attestation.sign(pretend.stub(), bad_sdist_filename)

def test_wrong_predicate_raises_exception(
self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch
) -> None:
def dummy_predicate(self_: _StatementBuilder, _: str) -> _StatementBuilder:
# wrong type here to have a validation error
self_._predicate_type = False
return self_

monkeypatch.setattr(sigstore.dsse._StatementBuilder, "predicate_type", dummy_predicate)
with pytest.raises(impl.AttestationError, match="invalid statement"):
impl.Attestation.sign(pretend.stub(), artifact_path)

def test_expired_certificate(
self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch
) -> None:
def in_validity_period(_: IdentityToken) -> bool:
return False

monkeypatch.setattr(IdentityToken, "in_validity_period", in_validity_period)

sign_ctx = SigningContext.staging()
with sign_ctx.signer(id_token, cache=False) as signer:
with pytest.raises(impl.AttestationError):
impl.Attestation.sign(signer, artifact_path)

def test_multiple_signatures(
self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch
) -> None:
def get_bundle(*_) -> Bundle: # noqa: ANN002
# Duplicate the signature to trigger a Conversion error
bundle = Bundle.from_json(gh_signed_bundle_path.read_bytes())
bundle._inner.dsse_envelope.signatures.append(bundle._inner.dsse_envelope.signatures[0])
return bundle

monkeypatch.setattr(sigstore.sign.Signer, "sign_dsse", get_bundle)

sign_ctx = SigningContext.staging()

with pytest.raises(impl.AttestationError):
with sign_ctx.signer(id_token) as signer:
impl.Attestation.sign(signer, artifact_path)

def test_verify_github_attested(self) -> None:
verifier = Verifier.production()
pol = policy.AllOf(
Expand Down