Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix GlobalUploadToken auth #631

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions codecov_auth/authentication/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import re
from typing import NamedTuple

from django.http import HttpRequest


class UploadInfo(NamedTuple):
service: str
encoded_slug: str
commitid: str | None


def get_upload_info_from_request_path(request: HttpRequest) -> UploadInfo | None:
path_info = request.get_full_path_info()
# The repo part comes from https://stackoverflow.com/a/22312124
upload_views_prefix_regex = (
r"\/upload\/(\w+)\/([\w\.@:_/\-~]+)\/commits(?:\/([a-f0-9]{40}))?"
)
match = re.search(upload_views_prefix_regex, path_info)

if match is None:
return None

service = match.group(1)
encoded_slug = match.group(2)
commitid = match.group(3)

return UploadInfo(service, encoded_slug, commitid)
71 changes: 30 additions & 41 deletions codecov_auth/authentication/repo_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from shared.metrics import metrics
from shared.torngit.exceptions import TorngitObjectNotFoundError, TorngitRateLimitError

from codecov_auth.authentication.helpers import get_upload_info_from_request_path
from codecov_auth.authentication.types import RepositoryAsUser, RepositoryAuthInterface
from codecov_auth.models import (
OrganizationLevelToken,
Expand Down Expand Up @@ -181,40 +182,37 @@
def authenticate(self, request):
global_tokens = get_global_tokens()
token = self.get_token(request)
repoid = self.get_repoid(request)
owner = self.get_owner(request)
using_global_token = True if token in global_tokens else False
service = global_tokens[token] if using_global_token else None
using_global_token = token in global_tokens
if not using_global_token:
return None # continue to next auth class

if using_global_token:
try:
repository = Repository.objects.get(
author__service=service,
repoid=repoid,
author__username=owner.username,
)
except ObjectDoesNotExist:
raise exceptions.AuthenticationFailed(
"Could not find a repository, try using repo upload token"
)
else:
service = global_tokens[token]
upload_info = get_upload_info_from_request_path(request)
if upload_info is None:
return None # continue to next auth class
# It's important NOT to use the service returned in upload_info
# To avoid someone uploading with GlobalUploadToken to a different service
# Than what it configured
repository = get_repository_from_string(
Service(service), upload_info.encoded_slug
)
if repository is None:
raise exceptions.AuthenticationFailed(
"Could not find a repository, try using repo upload token"
)
return (
RepositoryAsUser(repository),
LegacyTokenRepositoryAuth(repository, {"token": token}),
)

def get_token(self, request):
# TODO
pass

def get_repoid(self, request):
# TODO
pass

def get_owner(self, request):
# TODO
pass
def get_token(self, request: HttpRequest) -> str | None:
auth_header = request.headers.get("Authorization")
if not auth_header:
return None
if " " in auth_header:
_, token = auth_header.split(" ", 1)
return token
return auth_header

Check warning on line 215 in codecov_auth/authentication/repo_auth.py

View check run for this annotation

Codecov Notifications / codecov/patch

codecov_auth/authentication/repo_auth.py#L215

Added line #L215 was not covered by tests

Check warning on line 215 in codecov_auth/authentication/repo_auth.py

View check run for this annotation

Codecov - QA / codecov/patch

codecov_auth/authentication/repo_auth.py#L215

Added line #L215 was not covered by tests

Check warning on line 215 in codecov_auth/authentication/repo_auth.py

View check run for this annotation

Codecov Public QA / codecov/patch

codecov_auth/authentication/repo_auth.py#L215

Added line #L215 was not covered by tests


class OrgLevelTokenAuthentication(authentication.TokenAuthentication):
Expand Down Expand Up @@ -260,22 +258,13 @@
auth_failed_message = "Not valid tokenless upload"

def _get_info_from_request_path(
self, request
) -> tuple[Repository, str | None] | None:
path_info = request.get_full_path_info()
# The repo part comes from https://stackoverflow.com/a/22312124
upload_views_prefix_regex = (
r"\/upload\/(\w+)\/([\w\.@:_/\-~]+)\/commits(?:\/([a-f0-9]{40}))?"
)
match = re.search(upload_views_prefix_regex, path_info)
self, request: HttpRequest
) -> tuple[Repository, str | None]:
upload_info = get_upload_info_from_request_path(request)

if match is None:
if upload_info is None:
raise exceptions.AuthenticationFailed(self.auth_failed_message)

service = match.group(1)
encoded_slug = match.group(2)
commitid = match.group(3)

service, encoded_slug, commitid = upload_info
# Validate provider
try:
service_enum = Service(service)
Expand Down
63 changes: 37 additions & 26 deletions codecov_auth/tests/unit/test_repo_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,58 +168,69 @@ def get_mocked_global_tokens(self):
"bitbucketserveruploadtoken": "bitbucket_server",
}

def test_authentication_for_non_enterprise(self):
@patch("codecov_auth.authentication.repo_auth.get_global_tokens")
def test_authentication_no_global_token_available(self, mocked_get_global_tokens):
mocked_get_global_tokens.return_value = {}
authentication = GlobalTokenAuthentication()
request = APIRequestFactory().post("/endpoint")
request = APIRequestFactory().post("/upload/service/owner::::repo/commits")
res = authentication.authenticate(request)
assert res is None

@patch("codecov_auth.authentication.repo_auth.get_global_tokens")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_token")
def test_authentication_for_enterprise_wrong_token(
self, mocked_token, mocked_get_global_tokens
):
def test_authentication_for_enterprise_wrong_token(self, mocked_get_global_tokens):
mocked_get_global_tokens.return_value = self.get_mocked_global_tokens()
mocked_token.return_value = "random_token"
authentication = GlobalTokenAuthentication()
request = APIRequestFactory().post("/endpoint")
request = APIRequestFactory().post(
"/upload/service/owner::::repo/commits",
headers={"Authorization": "token GUT"},
)
res = authentication.authenticate(request)
assert res is None

@patch("codecov_auth.authentication.repo_auth.get_global_tokens")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_token")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_owner")
def test_authentication_for_enterprise_correct_token_repo_not_exists(
self, mocked_owner, mocked_token, mocked_get_global_tokens, db
self, mocked_get_global_tokens, db
):
mocked_get_global_tokens.return_value = self.get_mocked_global_tokens()
mocked_token.return_value = "githubuploadtoken"
mocked_owner.return_value = OwnerFactory.create()
authentication = GlobalTokenAuthentication()
request = APIRequestFactory().post("/endpoint")
request = APIRequestFactory().post(
"/upload/service/owner::::repo/commits",
headers={"Authorization": "token githubuploadtoken"},
)
with pytest.raises(exceptions.AuthenticationFailed) as exc:
authentication.authenticate(request)
assert exc.value.args == (
"Could not find a repository, try using repo upload token",
)

@pytest.mark.parametrize(
"owner_service, owner_name, token",
[
pytest.param("github", "username", "githubuploadtoken", id="github"),
pytest.param(
"gitlab", "username", "gitlabuploadtoken", id="gitlab_single_user"
),
pytest.param(
"gitlab",
"usergroup:username",
"gitlabuploadtoken",
id="gitlab_subgroup_user",
),
],
)
@patch("codecov_auth.authentication.repo_auth.get_global_tokens")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_token")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_owner")
@patch("codecov_auth.authentication.repo_auth.GlobalTokenAuthentication.get_repoid")
def test_authentication_for_enterprise_correct_token_repo_exists(
self, mocked_repoid, mocked_owner, mocked_token, mocked_get_global_tokens, db
self, mocked_get_global_tokens, owner_service, owner_name, token, db
):
mocked_get_global_tokens.return_value = self.get_mocked_global_tokens()
mocked_token.return_value = "githubuploadtoken"
owner = OwnerFactory.create(service="github")
repoid = 123
mocked_repoid.return_value = repoid
mocked_owner.return_value = owner

repository = RepositoryFactory.create(author=owner, repoid=repoid)
owner = OwnerFactory.create(service=owner_service, username=owner_name)
owner_name.replace(":", ":::") # encode name to test GL subgroups
repository = RepositoryFactory.create(author=owner)
authentication = GlobalTokenAuthentication()
request = APIRequestFactory().post("/endpoint")
request = APIRequestFactory().post(
f"/upload/{owner_service}/{owner_name}::::{repository.name}/commits",
headers={"Authorization": f"token {token}"},
)
res = authentication.authenticate(request)
assert res is not None
user, auth = res
Expand Down
Loading