From 818e2223b055386202e504ca93019ee06c46a1b9 Mon Sep 17 00:00:00 2001 From: Nikhil Palaskar Date: Tue, 14 Feb 2023 16:06:55 -0500 Subject: [PATCH] Use of oidc configuration in the server unit test as default (#3249) Use of OIDC configuration in the server unit test as default. We have decided to keep the internal Users table for the purpose of quick user lookup and logging. However, unlike the current Users table, there won't be any user credentials stored in the internal database. To facilitate this transition we have to update the User table and active tokens table. This will break all the unit tests because of the way they are set up right now. Currently, we update the active tokens table when we create a new JWT token in the unit test and update the respective user table relationship. And validation of the token depends on it being available in the active tokens table. This change is to use OIDC configuration in the unit tests and start using RS256 to encode and decode the JWT tokens (using OIDC token introspection). And remove the active tokens dependency from the unit tests. PBENCH-1079 --- .../api/resources/endpoint_configure.py | 2 - lib/pbench/server/auth/__init__.py | 26 +- lib/pbench/test/unit/server/auth/test_auth.py | 47 -- lib/pbench/test/unit/server/conftest.py | 153 ++++- .../server/query_apis/test_datasets_detail.py | 7 +- lib/pbench/test/unit/server/test_api_base.py | 4 +- .../unit/server/test_datasets_daterange.py | 8 +- .../unit/server/test_endpoint_configure.py | 4 +- lib/pbench/test/unit/server/test_shell_cli.py | 19 + lib/pbench/test/unit/server/test_user_auth.py | 539 ------------------ server/lib/config/pbench-server-default.cfg | 2 +- 11 files changed, 184 insertions(+), 627 deletions(-) diff --git a/lib/pbench/server/api/resources/endpoint_configure.py b/lib/pbench/server/api/resources/endpoint_configure.py index bb60f582d6..355c5a25d6 100644 --- a/lib/pbench/server/api/resources/endpoint_configure.py +++ b/lib/pbench/server/api/resources/endpoint_configure.py @@ -180,7 +180,6 @@ def get(self): try: client = self.server_config.get("openid-connect", "client") realm = self.server_config.get("openid-connect", "realm") - secret = self.server_config.get("openid-connect", "secret") server = self.server_config.get("openid-connect", "server_url") except (NoOptionError, NoSectionError): pass @@ -188,7 +187,6 @@ def get(self): endpoints["openid"] = { "client": client, "realm": realm, - "secret": secret, "server": server, } diff --git a/lib/pbench/server/auth/__init__.py b/lib/pbench/server/auth/__init__.py index 9e833ea961..878c9bd234 100644 --- a/lib/pbench/server/auth/__init__.py +++ b/lib/pbench/server/auth/__init__.py @@ -305,24 +305,23 @@ def construct_oidc_client(cls, server_config: PbenchServerConfig) -> "OpenIDClie server_url = server_config.get("openid-connect", "server_url") client = server_config.get("openid-connect", "client") realm = server_config.get("openid-connect", "realm") - secret = server_config.get("openid-connect", "secret") except (NoOptionError, NoSectionError) as exc: raise OpenIDClient.NotConfigured() from exc - return cls( + oidc_client = cls( server_url=server_url, client_id=client, realm_name=realm, - client_secret_key=secret, verify=False, ) + oidc_client.set_oidc_public_key() + return oidc_client def __init__( self, server_url: str, client_id: str, realm_name: str, - client_secret_key: str, verify: bool = True, headers: Optional[Dict[str, str]] = None, ): @@ -341,16 +340,24 @@ def __init__( server_url : OpenID Connect server auth url client_id : client id realm_name : realm name - client_secret_key : client secret key verify : True if require valid SSL headers : dict of custom header to pass to each HTML request """ self.client_id = client_id - self._client_secret_key = client_secret_key self._realm_name = realm_name self._connection = Connection(server_url, headers, verify) + self._pem_public_key = None + + def __repr__(self): + return ( + f"OpenIDClient(server_url={self._connection.server_url}, " + f"client_id={self.client_id}, realm_name={self._realm_name}, " + f"headers={self._connection.headers})" + ) + + def set_oidc_public_key(self): realm_public_key_uri = f"realms/{self._realm_name}" response_json = self._connection.get(realm_public_key_uri).json() public_key = response_json["public_key"] @@ -362,13 +369,6 @@ def __init__( pem_public_key += "-----END PUBLIC KEY-----\n" self._pem_public_key = pem_public_key - def __repr__(self): - return ( - f"OpenIDClient(server_url={self._connection.server_url}, " - f"client_id={self.client_id}, realm_name={self._realm_name}, " - f"headers={self._connection.headers})" - ) - def token_introspect(self, token: str) -> JSON: """Utility method to decode access/Id tokens using the public key provided by the identity provider. diff --git a/lib/pbench/test/unit/server/auth/test_auth.py b/lib/pbench/test/unit/server/auth/test_auth.py index 27558c88e5..2aadca49d3 100644 --- a/lib/pbench/test/unit/server/auth/test_auth.py +++ b/lib/pbench/test/unit/server/auth/test_auth.py @@ -3,8 +3,6 @@ from http import HTTPStatus from typing import Dict, Optional, Tuple, Union -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from flask import current_app, Flask import jwt import pytest @@ -252,33 +250,6 @@ def test_create_w_roles(self): assert user.roles == ["roleA", "roleB"] -@pytest.fixture(scope="session") -def rsa_keys() -> Dict[str, Union[rsa.RSAPrivateKey, str]]: - """Fixture for generating an RSA public / private key pair. - - Returns: - A dictionary containing the RSAPrivateKey object, the PEM encoded public - key string without the BEGIN/END bookends (mimicing what is returned by - an OpenID Connect broker), and the PEM encoded public key string with - the BEGIN/END bookends - """ - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - pem_public_key = ( - private_key.public_key() - .public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) - .decode() - ) - # Strip "-----BEGIN..." and "-----END...", and the empty element resulting - # from the trailing newline character. - public_key_l = pem_public_key.split("\n")[1:-2] - public_key = "".join(public_key_l) - return { - "private_key": private_key, - "public_key": public_key, - "pem_public_key": pem_public_key, - } - - def gen_rsa_token( audience: str, private_key: str, exp: str = "99999999999" ) -> Tuple[str, Dict[str, str]]: @@ -397,7 +368,6 @@ def test_construct_oidc_client_succ(self, monkeypatch): config = mock_connection(monkeypatch, client_id, public_key) server_url = config["openid-connect"]["server_url"] realm_name = config["openid-connect"]["realm"] - secret = config["openid-connect"]["secret"] oidc_client = OpenIDClient.construct_oidc_client(config) @@ -406,7 +376,6 @@ def test_construct_oidc_client_succ(self, monkeypatch): f"client_id={client_id}, realm_name={realm_name}, " "headers={})" ) - assert oidc_client._client_secret_key == secret assert ( oidc_client._pem_public_key == f"-----BEGIN PUBLIC KEY-----\n{public_key}\n-----END PUBLIC KEY-----\n" @@ -625,22 +594,6 @@ def delete(*args, **kwargs): user = Auth.verify_auth(pbench_drb_token_invalid) assert user is None - def test_verify_auth_internal_at_valid_fail( - self, monkeypatch, make_logger, pbench_drb_token - ): - """Verify behavior when a token is not in the database""" - - def query(*args, **kwargs): - return None - - monkeypatch.setattr(Auth.AuthToken, "query", query) - app = Flask("test-verify-auth-internal-at-valid-fail") - app.logger = make_logger - with app.app_context(): - current_app.secret_key = jwt_secret - user = Auth.verify_auth(pbench_drb_token) - assert user is None - def test_verify_auth_oidc_offline(self, monkeypatch, rsa_keys, make_logger): """Verify OIDC token offline verification success path""" client_id = "us" diff --git a/lib/pbench/test/unit/server/conftest.py b/lib/pbench/test/unit/server/conftest.py index da03cf88cc..8f132f1531 100644 --- a/lib/pbench/test/unit/server/conftest.py +++ b/lib/pbench/test/unit/server/conftest.py @@ -10,12 +10,16 @@ from stat import ST_MTIME import tarfile from typing import Dict, Optional +from urllib.parse import urljoin import uuid +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from freezegun import freeze_time import jwt import pytest from requests import Response +import responses from pbench.common import MetadataLog from pbench.common.logger import get_pbench_logger @@ -44,6 +48,9 @@ [flask-app] secret-key = my_precious +[openid-connect] +server_url = http://openid.example.com + [logging] logger_type = null # We run with DEBUG level logging during the server unit tests to help @@ -120,12 +127,62 @@ def server_config(on_disk_server_config) -> PbenchServerConfig: return server_config +@pytest.fixture(scope="session") +def rsa_keys(): + """Fixture for generating an RSA public / private key pair. + + Returns: + A dictionary containing the RSAPrivateKey object, the PEM encoded public + key string without the BEGIN/END bookends (mimicing what is returned by + an OpenID Connect broker), and the PEM encoded public key string with + the BEGIN/END bookends + """ + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem_public_key = ( + private_key.public_key() + .public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) + .decode() + ) + # Strip "-----BEGIN..." and "-----END...", and the empty element resulting + # from the trailing newline character. + public_key_l = pem_public_key.split("\n")[1:-2] + public_key = "".join(public_key_l) + return { + "private_key": private_key, + "public_key": public_key, + "pem_public_key": pem_public_key, + } + + +@pytest.fixture(scope="session") +def add_auth_connection_mock(server_config, rsa_keys): + """ + Mocks the OIDC public key GET Requests call on the realm uri. + Args: + server_config: Server_config fixture + rsa_keys: rsa_keys fixture to get te public key + """ + with responses.RequestsMock() as mock: + oidc_server = server_config.get("openid-connect", "server_url") + oidc_realm = server_config.get("openid-connect", "realm") + url = urljoin(oidc_server, f"realms/{oidc_realm}") + + mock.add( + responses.GET, + url, + status=HTTPStatus.OK, + json={"public_key": rsa_keys["public_key"]}, + ) + yield mock + + @pytest.fixture() -def client(monkeypatch, server_config): +def client(monkeypatch, server_config, add_auth_connection_mock): """A test client for the app. Fixtures: server_config: Set up a pbench-server.cfg configuration + add_auth_connection_mock: set up a mock OIDC connection NOTE: The Flask app initialization includes setting up the SQLAlchemy DB. For test cases that require the DB but not a full Flask app context, use @@ -725,25 +782,42 @@ def fake_find(name: str) -> Optional[Template]: @pytest.fixture() -def pbench_admin_token(client, create_admin_user): - """Internal valid token for the 'ADMIN' user""" - return generate_token(user=create_admin_user, username=admin_username) +def pbench_admin_token(client, server_config, create_admin_user, rsa_keys): + """OIDC valid token for the 'ADMIN' user""" + return generate_token( + user=create_admin_user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + username=admin_username, + pbench_client_roles=["ADMIN"], + ) @pytest.fixture() -def pbench_drb_token(client, create_drb_user): - """Internal valid token for the 'drb' user""" - return generate_token(username="drb", user=create_drb_user) +def pbench_drb_token(client, server_config, create_drb_user, rsa_keys): + """OIDC valid token for the 'drb' user""" + return generate_token( + username="drb", + client_id=server_config.get("openid-connect", "client"), + private_key=rsa_keys["private_key"], + user=create_drb_user, + ) @pytest.fixture() -def pbench_drb_token_invalid(client, create_drb_user): - """Internal invalid token for the 'drb' user""" - return generate_token(username="drb", user=create_drb_user, valid=False) +def pbench_drb_token_invalid(client, server_config, create_drb_user, rsa_keys): + """OIDC invalid token for the 'drb' user""" + return generate_token( + username="drb", + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + user=create_drb_user, + valid=False, + ) @pytest.fixture() -def get_token_func(pbench_admin_token): +def get_token_func(pbench_admin_token, server_config, rsa_keys): """Get the token function for fetching the token for a user This fixture yields a function value which can be called to get the internal @@ -754,23 +828,40 @@ def get_token_func(pbench_admin_token): generated. """ return lambda user: ( - pbench_admin_token if user == admin_username else generate_token(username=user) + pbench_admin_token + if user == admin_username + else generate_token( + username=user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + ) ) def generate_token( username: str, + private_key: str, + client_id: str, user: Optional[User] = None, + pbench_client_roles: Optional[list[str]] = None, valid: bool = True, ) -> str: - """Generates an internal JWT token that mimics a real internal token - obtained from an internal user login. + """Generates an OIDC JWT token that mimics a real OIDC token + obtained from the user login. + + Note: The OIDC client id passed as an argument has to match with the + oidc client id from the default config file. Otherwise the token + validation will fail in the server code. Args: - username : username to include in the token payload - user : user attributes will be extracted from the user object to include + username: username to include in the token payload + private_key: RS256 private key to encode the jwt token + client_id: OIDC client id to include in the encoded string. + user: user attributes will be extracted from the user object to include in the token payload. - valid : If True, the generated token will be valid for 10 mins. + pbench_client_roles: Any OIDC client specifc roles we want to include + in the token. + valid: If True, the generated token will be valid for 10 mins. If False, generated token would be invalid and expired Returns: @@ -788,9 +879,33 @@ def generate_token( "iat": current_utc, "exp": exp, "sub": user.id, + "aud": client_id, + "azp": client_id, + "realm_access": { + "roles": [ + "default-roles-pbench-server", + "offline_access", + "uma_authorization", + ] + }, + "resource_access": { + "broker": {"roles": ["read-token"]}, + "account": { + "roles": ["manage-account", "manage-account-links", "view-profile"] + }, + }, + "scope": "openid profile email", + "sid": "1988612e-774d-43b8-8d4a-bbc05ee55edb", + "email_verified": True, + "name": user.first_name + " " + user.last_name, + "preferred_username": username, + "given_name": user.first_name, + "family_name": user.last_name, + "email": user.email, } - token_str = jwt.encode(payload, jwt_secret, algorithm="HS256") - user.add_token(AuthToken(token=token_str, expiration=exp)) + if pbench_client_roles: + payload["resource_access"].update({client_id: {"roles": pbench_client_roles}}) + token_str = jwt.encode(payload, private_key, algorithm="RS256") return token_str diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py b/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py index a5b03c5be8..c2ffee9ff0 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py @@ -47,6 +47,7 @@ def test_query( query_api, find_template, pbench_admin_token, + rsa_keys, user, expected_status, ): @@ -61,7 +62,11 @@ def test_query( if user == "test_admin": token = pbench_admin_token else: - token = generate_token(username=user) + token = generate_token( + username=user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + ) assert token headers = {"authorization": f"bearer {token}"} diff --git a/lib/pbench/test/unit/server/test_api_base.py b/lib/pbench/test/unit/server/test_api_base.py index 71515eaa2f..ba1c891771 100644 --- a/lib/pbench/test/unit/server/test_api_base.py +++ b/lib/pbench/test/unit/server/test_api_base.py @@ -79,7 +79,9 @@ def options(self, **kwargs) -> Response: class TestApiBase: """Verify internal methods of the API base class.""" - def test_method_validation(self, server_config, make_logger, monkeypatch): + def test_method_validation( + self, server_config, make_logger, monkeypatch, add_auth_connection_mock + ): # Create the temporary flask application. app = Flask("test-api-server") app.debug = True diff --git a/lib/pbench/test/unit/server/test_datasets_daterange.py b/lib/pbench/test/unit/server/test_datasets_daterange.py index fb20a11aba..af9ae7faa6 100644 --- a/lib/pbench/test/unit/server/test_datasets_daterange.py +++ b/lib/pbench/test/unit/server/test_datasets_daterange.py @@ -18,7 +18,13 @@ class TestDatasetsDateRange: @pytest.fixture() def query_as( - self, client, server_config, more_datasets, provide_metadata, get_token_func + self, + client, + server_config, + more_datasets, + provide_metadata, + get_token_func, + add_auth_connection_mock, ): """ Helper fixture to perform the API query and validate an expected diff --git a/lib/pbench/test/unit/server/test_endpoint_configure.py b/lib/pbench/test/unit/server/test_endpoint_configure.py index 6235742217..fac0228ec6 100644 --- a/lib/pbench/test/unit/server/test_endpoint_configure.py +++ b/lib/pbench/test/unit/server/test_endpoint_configure.py @@ -136,15 +136,13 @@ def check_config(self, client, server_config, host, my_headers={}): try: oidc_client = server_config.get("openid-connect", "client") oidc_realm = server_config.get("openid-connect", "realm") - oidc_secret = server_config.get("openid-connect", "secret") oidc_server = server_config.get("openid-connect", "server_url") except (NoOptionError, NoSectionError): pass else: - expected_results["openid-connect"] = { + expected_results["openid"] = { "client": oidc_client, "realm": oidc_realm, - "secret": oidc_secret, "server": oidc_server, } diff --git a/lib/pbench/test/unit/server/test_shell_cli.py b/lib/pbench/test/unit/server/test_shell_cli.py index 12f56ec45d..9ac5d67f00 100644 --- a/lib/pbench/test/unit/server/test_shell_cli.py +++ b/lib/pbench/test/unit/server/test_shell_cli.py @@ -268,9 +268,17 @@ def test_main_crontab_failed(monkeypatch, make_logger, mock_get_server_config): def immediate_success(*args, **kwargs): pass + def wait_for_oidc_server( + server_config: PbenchServerConfig, logger: logging.Logger + ) -> str: + return "https://oidc.example.com" + def generate_crontab_if_necessary(*args, **kwargs) -> int: return 43 + monkeypatch.setattr( + shell.OpenIDClient, "wait_for_oidc_server", wait_for_oidc_server + ) monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_indexing", immediate_success) @@ -299,6 +307,11 @@ def init_indexing(*args, **kwargs) -> int: called[0] = True raise init_indexing_exc + monkeypatch.setattr( + shell.OpenIDClient, + "wait_for_oidc_server", + lambda config, logger: "https://oidc.example.com", + ) monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_indexing", init_indexing) @@ -325,6 +338,12 @@ def init_db(*args, **kwargs) -> int: called[0] = True raise init_db_exc + monkeypatch.setattr( + shell.OpenIDClient, + "wait_for_oidc_server", + lambda config, logger: "https://oidc.example.com", + ) + monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_db", init_db) diff --git a/lib/pbench/test/unit/server/test_user_auth.py b/lib/pbench/test/unit/server/test_user_auth.py index 18309c3e15..e69de29bb2 100644 --- a/lib/pbench/test/unit/server/test_user_auth.py +++ b/lib/pbench/test/unit/server/test_user_auth.py @@ -1,539 +0,0 @@ -from datetime import datetime, timedelta, timezone -from http import HTTPStatus -import time - -from pbench.server.database.database import Database -from pbench.server.database.models.auth_tokens import AuthToken -from pbench.server.database.models.users import User -from pbench.test.unit.server.conftest import admin_username - - -def register_user( - client, server_config, email, username, password, firstname, lastname -): - """ - Helper function to register a user using register API - """ - return client.post( - f"{server_config.rest_uri}/register", - json={ - "email": email, - "password": password, - "username": username, - "first_name": firstname, - "last_name": lastname, - }, - ) - - -def login_user(client, server_config, username, password, token_expiry=None): - """ - Helper function to generate a user authentication token - """ - return client.post( - f"{server_config.rest_uri}/login", - json={"username": username, "password": password, "token_expiry": token_expiry}, - ) - - -class TestUserAuthentication: - @staticmethod - def test_registration(client, server_config, tmp_path): - client.config["SESSION_FILE_DIR"] = tmp_path - """ Test for user registration """ - with client: - response = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert response.content_type == "application/json" - assert response.status_code, HTTPStatus.CREATED - - @staticmethod - def test_registration_missing_fields(client, server_config): - """Test for user registration missing fields""" - with client: - response = client.post( - f"{server_config.rest_uri}/register", - json={ - "email": "user@domain.com", - "password": "12345", - "username": "user", - }, - ) - data = response.json - assert data["message"] == "Missing first_name field" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.BAD_REQUEST - - @staticmethod - def test_registration_with_registered_user(client, server_config): - """Test registration with already registered email""" - user = User( - email="user@domain.com", - password="12345", - username="user", - first_name="firstname", - last_name="lastname", - ) - Database.db_session.add(user) - Database.db_session.commit() - with client: - response = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - data = response.json - assert data["message"] == "Provided username is already in use." - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.FORBIDDEN - - @staticmethod - def test_user_login(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - # registered user login - response = login_user(client, server_config, "user", "12345") - data = response.json - assert data["auth_token"] - assert data["username"] == "user" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_user_relogin(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # registered user login - response = login_user(client, server_config, "user", "12345") - data = response.json - assert data["auth_token"] - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.OK - first_auth_token = data["auth_token"] - - # Re-login with auth header - time.sleep(1) - response = client.post( - f"{server_config.rest_uri}/login", - headers=dict(Authorization=f"Bearer {first_auth_token}"), - json={"username": "user", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - second_auth_token = response.json["auth_token"] - assert ( - first_auth_token != second_auth_token - ), "Second login returned first token" - - # Re-login without auth header - time.sleep(1) - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "user", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - third_auth_token = response.json["auth_token"] - assert ( - first_auth_token != third_auth_token - ), "Third login returned first token" - assert ( - second_auth_token != third_auth_token - ), "Third login returned second token" - - @staticmethod - def test_login_removes_expired_tokens(client, server_config): - """Test to ensure on a new login previously expired tokens are - removed.""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="me", - firstname="first name", - lastname="last name", - email="me@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - user = User.query(username="me") - # Add 3 expired tokens, values are in "days" old - for age in (1, 2, 4): - token = AuthToken( - token=f"token-that-is-{age:d}-days-old", - expiration=datetime.now(timezone.utc) - timedelta(days=age), - ) - user.add_token(token) - - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "me", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - - for age in (1, 2, 4): - token = AuthToken.query(f"token-that-is-{age:d}-days-old") - assert token is None - - @staticmethod - def test_user_login_with_wrong_password(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # registered user login - response = login_user(client, server_config, "user", "123456") - data = response.json - assert data["message"] == "Bad login" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.UNAUTHORIZED - - @staticmethod - def test_login_without_password(client, server_config): - """Test for login of non-registered user""" - with client: - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "username"}, - ) - data = response.json - assert data["message"] == "Please provide a valid password" - assert response.status_code == HTTPStatus.BAD_REQUEST - - @staticmethod - def test_non_registered_user_login(client, server_config): - """Test for login of non-registered user""" - with client: - response = login_user(client, server_config, "username", "12345") - data = response.json - assert data["message"] == "Bad login" - assert response.status_code == HTTPStatus.UNAUTHORIZED - - @staticmethod - def test_get_user(client, server_config): - """Test for get user api""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - response = client.get( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - data = response.json - assert response.status_code == HTTPStatus.OK - assert data is not None - assert data["email"] == "user@domain.com" - assert data["username"] == "username" - assert data["first_name"] == "firstname" - - @staticmethod - def test_update_user(client, server_config): - """Test for get user api""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - - new_registration_time = datetime.now() - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"registered_on": new_registration_time, "first_name": "newname"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - data = response.json - assert data["message"] == "Invalid update request payload" - - # Test password update - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"password": "newpass"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - data = response.json - assert response.status_code == HTTPStatus.OK - assert data["first_name"] == "firstname" - assert data["email"] == "user@domain.com" - time.sleep(1) - response = login_user(client, server_config, "username", "newpass") - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_external_token_update(client, server_config): - """Test for external attempt at updating auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"auth_tokens": "external_auth_token"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.BAD_REQUEST - data = response.json - assert data["message"] == "Invalid fields in update request payload" - - @staticmethod - def test_malformed_auth_token(client, server_config): - """Test for user status for malformed auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = client.get( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer" + "malformed"), - ) - data = response.json - assert data is None - - @staticmethod - def test_valid_logout(client, server_config): - """Test for logout before token expires""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "user", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - # valid token logout - response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - # Check if the token has been successfully removed from the database - assert ( - not Database.db_session.query(AuthToken) - .filter_by(token=data_login["auth_token"]) - .first() - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_invalid_logout(client, server_config): - """Testing logout after the token expires""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert resp_login.status_code == HTTPStatus.OK - assert data_login["auth_token"] - - # log out with the current token - logout_response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert logout_response.status_code == HTTPStatus.OK - - # Logout using invalid token - # Expect 200 on response, since the invalid token can not be used anymore - response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_delete_user(client, server_config): - """Test for user status for malformed auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - - response = client.delete( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_non_existent_user_delete(client, server_config): - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - - response = client.delete( - f"{server_config.rest_uri}/user/username1", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - assert response.json["message"] == "Not authorized to access user username1" - - @staticmethod - def test_admin_access(client, server_config, pbench_admin_token): - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # Update user with admin credentials - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"first_name": "newname"}, - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.OK - - # Delete user with admin credentials - response = client.delete( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_admin_delete(client, server_config, pbench_admin_token): - # Delete admin user with admin credentials - # We should not be able to delete an admin user - response = client.delete( - f"{server_config.rest_uri}/user/{admin_username}", - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - assert response.json["message"] == "Not authorized to delete user" diff --git a/server/lib/config/pbench-server-default.cfg b/server/lib/config/pbench-server-default.cfg index 7bed42c4cd..c612a8c013 100644 --- a/server/lib/config/pbench-server-default.cfg +++ b/server/lib/config/pbench-server-default.cfg @@ -112,7 +112,7 @@ wait_timeout = 120 realm = pbench-server # Client entity name requesting OIDC to authenticate a user. -client = pbench-server-client +client = pbench-dashboard [logging] logger_type = devlog