-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new permission for internal api service checks (#2472)
Make naming more consistent with "users-backend" and not "users-service"
- Loading branch information
1 parent
31a9fca
commit 9080f9e
Showing
6 changed files
with
162 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,52 @@ | ||
from typing import Any | ||
|
||
import jwt | ||
from pythonit_toolkit.headers import SERVICE_JWT_HEADER | ||
from pythonit_toolkit.pastaporto.entities import Credential | ||
from pythonit_toolkit.pastaporto.tokens import decode_service_to_service_token | ||
from strawberry.permission import BasePermission | ||
from strawberry.types import Info | ||
|
||
|
||
class IsAuthenticated(BasePermission): | ||
message = "Not authenticated" | ||
|
||
def has_permission(self, source, info, **kwargs): | ||
return Credential.AUTHENTICATED in info.context.request.auth.scopes | ||
|
||
|
||
def IsService(allowed_callers: list[str], secret: str, service: str): | ||
if not allowed_callers: | ||
raise ValueError("No callers allowed specified") | ||
|
||
if not secret: | ||
raise ValueError("JWT secret cannot be empty") | ||
|
||
if not service: | ||
raise ValueError("Current service name cannot be empty") | ||
|
||
secret = str(secret) | ||
|
||
class _IsService(BasePermission): | ||
message = "Forbidden" | ||
|
||
def has_permission(self, source: Any, info: Info, **kwargs) -> bool: | ||
token = info.context.request.headers.get(SERVICE_JWT_HEADER) | ||
|
||
for caller in allowed_callers: | ||
try: | ||
decode_service_to_service_token( | ||
token, secret, issuer=caller, audience=service | ||
) | ||
return True | ||
except ( | ||
jwt.DecodeError, | ||
jwt.InvalidIssuerError, | ||
jwt.ExpiredSignatureError, | ||
jwt.InvalidAudienceError, | ||
): | ||
pass | ||
|
||
return False | ||
|
||
return _IsService |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
from unittest.mock import MagicMock | ||
|
||
from pythonit_toolkit.api.permissions import IsService | ||
from pythonit_toolkit.headers import SERVICE_JWT_HEADER | ||
from pythonit_toolkit.pastaporto.tokens import generate_service_to_service_token | ||
from ward import fixture, raises, test | ||
|
||
|
||
@fixture | ||
def fake_info(): | ||
def _fake_info(token): | ||
headers = {SERVICE_JWT_HEADER: token} | ||
mock_info = MagicMock() | ||
mock_info.context.request.headers.get = headers.get | ||
return mock_info | ||
|
||
return _fake_info | ||
|
||
|
||
@test("Allowed callers pass the permission") | ||
async def _(fake_info=fake_info): | ||
test_token = generate_service_to_service_token("test", "gateway", "users-backend") | ||
mock_info = fake_info(test_token) | ||
|
||
PermissionClass = IsService(["gateway"], "test", "users-backend") | ||
|
||
assert PermissionClass().has_permission(None, mock_info) is True | ||
|
||
|
||
@test("Not allowed callers fail the permission") | ||
async def _(fake_info=fake_info): | ||
test_token = generate_service_to_service_token( | ||
"test", "pycon-backend", "users-backend" | ||
) | ||
mock_info = fake_info(test_token) | ||
|
||
PermissionClass = IsService(["gateway"], "test", "users-backend") | ||
|
||
assert PermissionClass().has_permission(None, mock_info) is False | ||
|
||
|
||
@test("Multiple allowed callers pass the permission") | ||
async def _(fake_info=fake_info): | ||
test_token = generate_service_to_service_token( | ||
"test", "association-backend", "users-backend" | ||
) | ||
mock_info = fake_info(test_token) | ||
|
||
PermissionClass = IsService( | ||
["gateway", "association-backend"], "test", "users-backend" | ||
) | ||
|
||
assert PermissionClass().has_permission(None, mock_info) is True | ||
|
||
|
||
@test("Wrong secret fail permission") | ||
async def _(fake_info=fake_info): | ||
test_token = generate_service_to_service_token( | ||
"wrong-secret", "pycon-backend", "users-backend" | ||
) | ||
mock_info = fake_info(test_token) | ||
|
||
PermissionClass = IsService(["gateway"], "test", "users-backend") | ||
|
||
assert PermissionClass().has_permission(None, mock_info) is False | ||
|
||
|
||
@test("Token for another service fails permission") | ||
async def _(fake_info=fake_info): | ||
test_token = generate_service_to_service_token("test", "pycon-backend", "gateway") | ||
mock_info = fake_info(test_token) | ||
|
||
PermissionClass = IsService(["pycon-backend"], "test", "users-backend") | ||
|
||
assert PermissionClass().has_permission(None, mock_info) is False | ||
|
||
|
||
@test("Cannot create a permission not allowing any service") | ||
async def _(): | ||
with raises(ValueError) as exc: | ||
IsService([], "test", "users-backend") | ||
|
||
assert str(exc.raised) == "No callers allowed specified" | ||
|
||
|
||
@test("Cannot create a permission without any secret") | ||
async def _(): | ||
with raises(ValueError) as exc: | ||
IsService(["service"], "", "users-backend") | ||
|
||
assert str(exc.raised) == "JWT secret cannot be empty" | ||
|
||
|
||
@test("Cannot create a permission without the current service name") | ||
async def _(): | ||
with raises(ValueError) as exc: | ||
IsService(["service"], "secret", "") | ||
|
||
assert str(exc.raised) == "Current service name cannot be empty" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters