Skip to content

Commit

Permalink
Add support for uploading attestations in legacy API
Browse files Browse the repository at this point in the history
  • Loading branch information
facutuesca committed May 13, 2024
1 parent c81fac9 commit af6432f
Show file tree
Hide file tree
Showing 8 changed files with 612 additions and 144 deletions.
4 changes: 3 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ hupper>=1.9
pip-tools>=1.0
pyramid_debugtoolbar>=2.5
pip-api
repository-service-tuf
# TODO: re-add before merging, once repository-service-tuf releases a new version.
# The current version pins tuf==3.1.0, which conflicts with our `sigstore` dependency (`tuf==4.0.0`)
#repository-service-tuf
2 changes: 2 additions & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ redis>=2.8.0,<6.0.0
rfc3986
sentry-sdk
setuptools
sigstore==3.0.0rc2
pypi-attestation-models==0.0.1rc2
sqlalchemy[asyncio]>=2.0,<3.0
stdlib-list
stripe
Expand Down
328 changes: 185 additions & 143 deletions requirements/main.txt

Large diffs are not rendered by default.

310 changes: 310 additions & 0 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
import pretend
import pytest

from pypi_attestation_models import (
Attestation,
InvalidAttestationError,
VerificationError,
VerificationMaterial,
)
from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPTooManyRequests
from sigstore.verify import Verifier
from sqlalchemy import and_, exists
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
Expand Down Expand Up @@ -2385,6 +2392,82 @@ def test_upload_fails_without_oidc_publisher_permission(
"See /the/help/url/ for more information."
).format(project.name)

def test_upload_attestation_fails_without_oidc_publisher(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
project_service,
macaroon_service,
):
project = ProjectFactory.create()
owner = UserFactory.create()
maintainer = UserFactory.create()
RoleFactory.create(user=owner, project=project, role_name="Owner")
RoleFactory.create(user=maintainer, project=project, role_name="Maintainer")

EmailFactory.create(user=maintainer)
db_request.user = maintainer
raw_macaroon, macaroon = macaroon_service.create_macaroon(
"fake location",
"fake description",
[caveats.RequestUser(user_id=str(maintainer.id))],
user_id=maintainer.id,
)
identity = UserTokenContext(maintainer, macaroon)

filename = "{}-{}.tar.gz".format(project.name, "1.0")
attestation = Attestation(
version=1,
verification_material=VerificationMaterial(
certificate="some_cert", transparency_entries=[dict()]
),
message_signature="some_signature",
)

pyramid_config.testing_securitypolicy(identity=identity)
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": f"[{attestation.model_dump_json()}]",
"version": "1.0",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
extract_http_macaroon = pretend.call_recorder(lambda r, _: raw_macaroon)
monkeypatch.setattr(
security_policy, "_extract_http_macaroon", extract_http_macaroon
)

db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMacaroonService: macaroon_service,
IMetricsService: metrics,
IProjectService: project_service,
}.get(svc)
db_request.user_agent = "warehouse-tests/6.6.6"

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status == (
"400 Attestations are currently only supported when using Trusted "
"Publishing with GitHub Actions."
)

@pytest.mark.parametrize(
"plat",
[
Expand Down Expand Up @@ -3293,6 +3376,233 @@ def test_upload_succeeds_creates_release(
),
]

def test_upload_with_valid_attestation_succeeds(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
):
from warehouse.events.models import HasEvents

project = ProjectFactory.create()
version = "1.0"
publisher = GitHubPublisherFactory.create(projects=[project])
claims = {
"sha": "somesha",
"repository": f"{publisher.repository_owner}/{publisher.repository_name}",
"workflow": "workflow_name",
}
identity = PublisherTokenContext(publisher, SignedClaims(claims))
db_request.oidc_publisher = identity.publisher
db_request.oidc_claims = identity.claims

db_request.db.add(Classifier(classifier="Environment :: Other Environment"))
db_request.db.add(Classifier(classifier="Programming Language :: Python"))

filename = "{}-{}.tar.gz".format(project.name, "1.0")
attestation = Attestation(
version=1,
verification_material=VerificationMaterial(
certificate="somebase64string", transparency_entries=[dict()]
),
message_signature="somebase64string",
)

pyramid_config.testing_securitypolicy(identity=identity)
db_request.user = None
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": f"[{attestation.model_dump_json()}]",
"version": version,
"summary": "This is my summary!",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMetricsService: metrics,
}.get(svc)

record_event = pretend.call_recorder(
lambda self, *, tag, request=None, additional: None
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

verify = pretend.call_recorder(lambda _self, _verifier, _policy, _dist: None)
monkeypatch.setattr(Attestation, "verify", verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

resp = legacy.file_upload(db_request)

assert resp.status_code == 200

assert len(verify.calls) == 1

def test_upload_with_malformed_attestation_fails(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
):
from warehouse.events.models import HasEvents

project = ProjectFactory.create()
version = "1.0"
publisher = GitHubPublisherFactory.create(projects=[project])
claims = {
"sha": "somesha",
"repository": f"{publisher.repository_owner}/{publisher.repository_name}",
"workflow": "workflow_name",
}
identity = PublisherTokenContext(publisher, SignedClaims(claims))
db_request.oidc_publisher = identity.publisher
db_request.oidc_claims = identity.claims

db_request.db.add(Classifier(classifier="Environment :: Other Environment"))
db_request.db.add(Classifier(classifier="Programming Language :: Python"))

filename = "{}-{}.tar.gz".format(project.name, "1.0")

pyramid_config.testing_securitypolicy(identity=identity)
db_request.user = None
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": "[{'a_malformed_attestation': 3}]",
"version": version,
"summary": "This is my summary!",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMetricsService: metrics,
}.get(svc)

record_event = pretend.call_recorder(
lambda self, *, tag, request=None, additional: None
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

def failing_verify(_self, _verifier, _policy, _dist):
raise InvalidAttestationError

monkeypatch.setattr(Attestation, "verify", failing_verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status.startswith(
"400 Error while decoding the included attestation:"
)

def test_upload_with_failing_attestation_fails(
self,
monkeypatch,
pyramid_config,
db_request,
metrics,
):
from warehouse.events.models import HasEvents

project = ProjectFactory.create()
version = "1.0"
publisher = GitHubPublisherFactory.create(projects=[project])
claims = {
"sha": "somesha",
"repository": f"{publisher.repository_owner}/{publisher.repository_name}",
"workflow": "workflow_name",
}
identity = PublisherTokenContext(publisher, SignedClaims(claims))
db_request.oidc_publisher = identity.publisher
db_request.oidc_claims = identity.claims

db_request.db.add(Classifier(classifier="Environment :: Other Environment"))
db_request.db.add(Classifier(classifier="Programming Language :: Python"))

filename = "{}-{}.tar.gz".format(project.name, "1.0")
attestation = Attestation(
version=1,
verification_material=VerificationMaterial(
certificate="somebase64string", transparency_entries=[dict()]
),
message_signature="somebase64string",
)

pyramid_config.testing_securitypolicy(identity=identity)
db_request.user = None
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"attestations": f"[{attestation.model_dump_json()}]",
"version": version,
"summary": "This is my summary!",
"filetype": "sdist",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

storage_service = pretend.stub(store=lambda path, filepath, meta: None)
db_request.find_service = lambda svc, name=None, context=None: {
IFileStorage: storage_service,
IMetricsService: metrics,
}.get(svc)

record_event = pretend.call_recorder(
lambda self, *, tag, request=None, additional: None
)
monkeypatch.setattr(HasEvents, "record_event", record_event)

def failing_verify(_self, _verifier, _policy, _dist):
raise VerificationError("verification failed")

monkeypatch.setattr(Attestation, "verify", failing_verify)
monkeypatch.setattr(Verifier, "production", lambda: pretend.stub())

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status.startswith(
"400 Could not verify the uploaded artifact using the included attestation"
)

@pytest.mark.parametrize(
"version, expected_version",
[
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/oidc/models/test_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from tests.common.db.oidc import GitHubPublisherFactory, PendingGitHubPublisherFactory
from warehouse.oidc import errors
from warehouse.oidc.errors import InvalidPublisherError
from warehouse.oidc.models import _core, github


Expand Down Expand Up @@ -470,6 +471,31 @@ def test_github_publisher_environment_claim(self, truth, claim, valid):
check = github.GitHubPublisher.__optional_verifiable_claims__["environment"]
assert check(truth, claim, pretend.stub()) is valid

@pytest.mark.parametrize(
("ref", "sha", "raises"),
[
("ref", "sha", False),
(None, "sha", False),
("ref", None, False),
(None, None, True),
],
)
def test_github_publisher_verification_policy(self, ref, sha, raises):
publisher = github.GitHubPublisher(
repository_name="fakerepo",
repository_owner="fakeowner",
repository_owner_id="fakeid",
workflow_filename="fakeworkflow.yml",
environment="",
)
claims = {"ref": ref, "sha": sha}

if not raises:
publisher.publisher_verification_policy(claims)
else:
with pytest.raises(InvalidPublisherError):
publisher.publisher_verification_policy(claims)

def test_github_publisher_duplicates_cant_be_created(self, db_request):
publisher1 = github.GitHubPublisher(
repository_name="repository_name",
Expand Down
Loading

0 comments on commit af6432f

Please sign in to comment.