Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use of oidc configuration in the server unit test as default #3249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/pbench/server/api/resources/endpoint_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,13 @@ def get(self):
try:
client = self.server_config.get("openid-connect", "client")
realm = self.server_config.get("openid-connect", "realm")
secret = self.server_config.get("openid-connect", "secret")
server = self.server_config.get("openid-connect", "server_url")
except (NoOptionError, NoSectionError):
pass
else:
endpoints["openid"] = {
"client": client,
"realm": realm,
"secret": secret,
"server": server,
}

Expand Down
26 changes: 13 additions & 13 deletions lib/pbench/server/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,23 @@ def construct_oidc_client(cls, server_config: PbenchServerConfig) -> "OpenIDClie
server_url = server_config.get("openid-connect", "server_url")
client = server_config.get("openid-connect", "client")
realm = server_config.get("openid-connect", "realm")
secret = server_config.get("openid-connect", "secret")
except (NoOptionError, NoSectionError) as exc:
raise OpenIDClient.NotConfigured() from exc

return cls(
oidc_client = cls(
server_url=server_url,
client_id=client,
realm_name=realm,
client_secret_key=secret,
verify=False,
)
oidc_client.set_oidc_public_key()
return oidc_client

def __init__(
self,
server_url: str,
client_id: str,
realm_name: str,
client_secret_key: str,
verify: bool = True,
headers: Optional[Dict[str, str]] = None,
):
Expand All @@ -341,16 +340,24 @@ def __init__(
server_url : OpenID Connect server auth url
client_id : client id
realm_name : realm name
client_secret_key : client secret key
verify : True if require valid SSL
headers : dict of custom header to pass to each HTML request
"""
self.client_id = client_id
self._client_secret_key = client_secret_key
self._realm_name = realm_name

self._connection = Connection(server_url, headers, verify)

self._pem_public_key = None

def __repr__(self):
return (
f"OpenIDClient(server_url={self._connection.server_url}, "
f"client_id={self.client_id}, realm_name={self._realm_name}, "
f"headers={self._connection.headers})"
)

def set_oidc_public_key(self):
realm_public_key_uri = f"realms/{self._realm_name}"
response_json = self._connection.get(realm_public_key_uri).json()
public_key = response_json["public_key"]
Expand All @@ -362,13 +369,6 @@ def __init__(
pem_public_key += "-----END PUBLIC KEY-----\n"
self._pem_public_key = pem_public_key

def __repr__(self):
return (
f"OpenIDClient(server_url={self._connection.server_url}, "
f"client_id={self.client_id}, realm_name={self._realm_name}, "
f"headers={self._connection.headers})"
)

def token_introspect(self, token: str) -> JSON:
"""Utility method to decode access/Id tokens using the public key
provided by the identity provider.
Expand Down
47 changes: 0 additions & 47 deletions lib/pbench/test/unit/server/auth/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from http import HTTPStatus
from typing import Dict, Optional, Tuple, Union

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from flask import current_app, Flask
import jwt
import pytest
Expand Down Expand Up @@ -252,33 +250,6 @@ def test_create_w_roles(self):
assert user.roles == ["roleA", "roleB"]


@pytest.fixture(scope="session")
def rsa_keys() -> Dict[str, Union[rsa.RSAPrivateKey, str]]:
"""Fixture for generating an RSA public / private key pair.

Returns:
A dictionary containing the RSAPrivateKey object, the PEM encoded public
key string without the BEGIN/END bookends (mimicing what is returned by
an OpenID Connect broker), and the PEM encoded public key string with
the BEGIN/END bookends
"""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pem_public_key = (
private_key.public_key()
.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
.decode()
)
# Strip "-----BEGIN..." and "-----END...", and the empty element resulting
# from the trailing newline character.
public_key_l = pem_public_key.split("\n")[1:-2]
public_key = "".join(public_key_l)
return {
"private_key": private_key,
"public_key": public_key,
"pem_public_key": pem_public_key,
}


def gen_rsa_token(
audience: str, private_key: str, exp: str = "99999999999"
) -> Tuple[str, Dict[str, str]]:
Expand Down Expand Up @@ -397,7 +368,6 @@ def test_construct_oidc_client_succ(self, monkeypatch):
config = mock_connection(monkeypatch, client_id, public_key)
server_url = config["openid-connect"]["server_url"]
realm_name = config["openid-connect"]["realm"]
secret = config["openid-connect"]["secret"]

oidc_client = OpenIDClient.construct_oidc_client(config)

Expand All @@ -406,7 +376,6 @@ def test_construct_oidc_client_succ(self, monkeypatch):
f"client_id={client_id}, realm_name={realm_name}, "
"headers={})"
)
assert oidc_client._client_secret_key == secret
assert (
oidc_client._pem_public_key
== f"-----BEGIN PUBLIC KEY-----\n{public_key}\n-----END PUBLIC KEY-----\n"
Expand Down Expand Up @@ -625,22 +594,6 @@ def delete(*args, **kwargs):
user = Auth.verify_auth(pbench_drb_token_invalid)
assert user is None

def test_verify_auth_internal_at_valid_fail(
self, monkeypatch, make_logger, pbench_drb_token
):
"""Verify behavior when a token is not in the database"""

def valid(*args, **kwargs):
return False

monkeypatch.setattr(Auth.ActiveTokens, "valid", valid)
app = Flask("test-verify-auth-internal-at-valid-fail")
app.logger = make_logger
with app.app_context():
current_app.secret_key = jwt_secret
user = Auth.verify_auth(pbench_drb_token)
assert user is None

def test_verify_auth_oidc_offline(self, monkeypatch, rsa_keys, make_logger):
"""Verify OIDC token offline verification success path"""
client_id = "us"
Expand Down
154 changes: 133 additions & 21 deletions lib/pbench/test/unit/server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@
from stat import ST_MTIME
import tarfile
from typing import Dict, Optional
from urllib.parse import urljoin
import uuid

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from email_validator import EmailNotValidError, ValidatedEmail
from freezegun import freeze_time
import jwt
import pytest
from requests import Response
import responses

from pbench.common import MetadataLog
from pbench.common.logger import get_pbench_logger
Expand All @@ -25,7 +29,6 @@
import pbench.server.auth.auth as Auth
from pbench.server.database import init_db
from pbench.server.database.database import Database
from pbench.server.database.models.active_tokens import ActiveTokens
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.template import Template
from pbench.server.database.models.users import User
Expand All @@ -49,6 +52,9 @@
host = elasticsearch.example.com
port = 7080

[openid-connect]
server_url = http://openid.example.com

[logging]
logger_type = null
# We run with DEBUG level logging during the server unit tests to help
Expand Down Expand Up @@ -124,8 +130,57 @@ def server_config(on_disk_server_config) -> PbenchServerConfig:
return server_config


@pytest.fixture(scope="session")
def rsa_keys():
"""Fixture for generating an RSA public / private key pair.

Returns:
A dictionary containing the RSAPrivateKey object, the PEM encoded public
key string without the BEGIN/END bookends (mimicing what is returned by
an OpenID Connect broker), and the PEM encoded public key string with
the BEGIN/END bookends
"""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pem_public_key = (
private_key.public_key()
.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
.decode()
)
# Strip "-----BEGIN..." and "-----END...", and the empty element resulting
# from the trailing newline character.
public_key_l = pem_public_key.split("\n")[1:-2]
public_key = "".join(public_key_l)
return {
"private_key": private_key,
"public_key": public_key,
"pem_public_key": pem_public_key,
}


@pytest.fixture(scope="session")
def add_auth_connection_mock(server_config, rsa_keys):
"""
Mocks the OIDC public key GET Requests call on the realm uri.
Args:
server_config: Server_config fixture
rsa_keys: rsa_keys fixture to get te public key
"""
with responses.RequestsMock() as mock:
oidc_server = server_config.get("openid-connect", "server_url")
oidc_realm = server_config.get("openid-connect", "realm")
url = urljoin(oidc_server, f"realms/{oidc_realm}")

mock.add(
responses.GET,
url,
status=HTTPStatus.OK,
json={"public_key": rsa_keys["public_key"]},
)
yield mock


@pytest.fixture()
def client(monkeypatch, server_config, fake_email_validator):
def client(monkeypatch, server_config, fake_email_validator, add_auth_connection_mock):
"""A test client for the app.

Fixtures:
Expand Down Expand Up @@ -760,25 +815,42 @@ def fake_find(name: str) -> Optional[Template]:


@pytest.fixture()
def pbench_admin_token(client, create_admin_user):
"""Internal valid token for the 'ADMIN' user"""
return generate_token(user=create_admin_user, username=admin_username)
def pbench_admin_token(client, server_config, create_admin_user, rsa_keys):
"""OIDC valid token for the 'ADMIN' user"""
return generate_token(
user=create_admin_user,
private_key=rsa_keys["private_key"],
client_id=server_config.get("openid-connect", "client"),
username=admin_username,
pbench_client_roles=["ADMIN"],
)


@pytest.fixture()
def pbench_drb_token(client, create_drb_user):
"""Internal valid token for the 'drb' user"""
return generate_token(username="drb", user=create_drb_user)
def pbench_drb_token(client, server_config, create_drb_user, rsa_keys):
"""OIDC valid token for the 'drb' user"""
return generate_token(
username="drb",
client_id=server_config.get("openid-connect", "client"),
private_key=rsa_keys["private_key"],
user=create_drb_user,
)


@pytest.fixture()
def pbench_drb_token_invalid(client, create_drb_user):
"""Internal invalid token for the 'drb' user"""
return generate_token(username="drb", user=create_drb_user, valid=False)
def pbench_drb_token_invalid(client, server_config, create_drb_user, rsa_keys):
"""OIDC invalid token for the 'drb' user"""
return generate_token(
username="drb",
private_key=rsa_keys["private_key"],
client_id=server_config.get("openid-connect", "client"),
user=create_drb_user,
valid=False,
)


@pytest.fixture()
def get_token_func(pbench_admin_token):
def get_token_func(pbench_admin_token, server_config, rsa_keys):
"""Get the token function for fetching the token for a user

This fixture yields a function value which can be called to get the internal
Expand All @@ -789,23 +861,40 @@ def get_token_func(pbench_admin_token):
generated.
"""
return lambda user: (
pbench_admin_token if user == admin_username else generate_token(username=user)
pbench_admin_token
if user == admin_username
else generate_token(
username=user,
private_key=rsa_keys["private_key"],
client_id=server_config.get("openid-connect", "client"),
)
)


def generate_token(
username: str,
private_key: str,
client_id: str,
user: Optional[User] = None,
pbench_client_roles: Optional[list[str]] = None,
valid: bool = True,
) -> str:
"""Generates an internal JWT token that mimics a real internal token
obtained from an internal user login.
"""Generates an OIDC JWT token that mimics a real OIDC token
obtained from the user login.

Note: The OIDC client id passed as an argument has to match with the
oidc client id from the default config file. Otherwise the token
validation will fail in the server code.

Args:
username : username to include in the token payload
user : user attributes will be extracted from the user object to include
username: username to include in the token payload
private_key: RS256 private key to encode the jwt token
client_id: OIDC client id to include in the encoded string.
user: user attributes will be extracted from the user object to include
in the token payload.
valid : If True, the generated token will be valid for 10 mins.
pbench_client_roles: Any OIDC client specifc roles we want to include
in the token.
valid: If True, the generated token will be valid for 10 mins.
If False, generated token would be invalid and expired

Returns:
Expand All @@ -823,10 +912,33 @@ def generate_token(
"iat": current_utc,
"exp": exp,
"sub": user.id,
"aud": client_id,
"azp": client_id,
"realm_access": {
"roles": [
"default-roles-pbench-server",
"offline_access",
"uma_authorization",
]
},
"resource_access": {
"broker": {"roles": ["read-token"]},
"account": {
"roles": ["manage-account", "manage-account-links", "view-profile"]
},
},
"scope": "openid profile email",
"sid": "1988612e-774d-43b8-8d4a-bbc05ee55edb",
"email_verified": True,
"name": user.first_name + " " + user.last_name,
"preferred_username": username,
"given_name": user.first_name,
"family_name": user.last_name,
"email": user.email,
webbnh marked this conversation as resolved.
Show resolved Hide resolved
}
token_str = jwt.encode(payload, jwt_secret, algorithm="HS256")
token = ActiveTokens(auth_token=token_str)
user.update(auth_tokens=token)
if pbench_client_roles:
payload["resource_access"].update({client_id: {"roles": pbench_client_roles}})
token_str = jwt.encode(payload, private_key, algorithm="RS256")
return token_str


Expand Down
Loading