diff --git a/src/fidesops/api/v1/endpoints/user_endpoints.py b/src/fidesops/api/v1/endpoints/user_endpoints.py index f6142102c..8e9f7bd2e 100644 --- a/src/fidesops/api/v1/endpoints/user_endpoints.py +++ b/src/fidesops/api/v1/endpoints/user_endpoints.py @@ -1,4 +1,5 @@ import logging +from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Security from fastapi_pagination.ext.sqlalchemy import paginate @@ -9,6 +10,7 @@ from datetime import datetime from sqlalchemy.orm import Session +from sqlalchemy_utils import escape_like from starlette.status import ( HTTP_200_OK, HTTP_201_CREATED, @@ -89,13 +91,18 @@ def create_user( response_model=Page[UserResponse], ) def get_users( - *, db: Session = Depends(deps.get_db), params: Params = Depends() + *, + db: Session = Depends(deps.get_db), + params: Params = Depends(), + username: Optional[str] = None, ) -> AbstractPage[FidesopsUser]: """Returns a paginated list of all users""" logger.info(f"Returned a paginated list of all users.") - return paginate( - FidesopsUser.query(db).order_by(FidesopsUser.created_at.desc()), params=params - ) + query = FidesopsUser.query(db) + if username: + query = query.filter(FidesopsUser.username.ilike(f"%{escape_like(username)}%")) + + return paginate(query.order_by(FidesopsUser.created_at.desc()), params=params) @router.get( diff --git a/tests/api/v1/endpoints/test_user_endpoints.py b/tests/api/v1/endpoints/test_user_endpoints.py index 90905d6cb..aa1d36705 100644 --- a/tests/api/v1/endpoints/test_user_endpoints.py +++ b/tests/api/v1/endpoints/test_user_endpoints.py @@ -1,12 +1,21 @@ from datetime import datetime - +from typing import List import json import pytest from fastapi_pagination import Params from starlette.testclient import TestClient - +from starlette.status import ( + HTTP_200_OK, + HTTP_201_CREATED, + HTTP_204_NO_CONTENT, + HTTP_403_FORBIDDEN, + HTTP_401_UNAUTHORIZED, + HTTP_400_BAD_REQUEST, + HTTP_422_UNPROCESSABLE_ENTITY, + HTTP_404_NOT_FOUND +) from fidesops.api.v1.urn_registry import V1_URL_PREFIX, USERS, LOGIN, LOGOUT, USER_DETAIL from fidesops.models.client import ClientDetail, ADMIN_UI_ROOT @@ -35,12 +44,12 @@ def url(self, oauth_client: ClientDetail) -> str: def test_create_user_not_authenticated(self, url, api_client): response = api_client.post(url, headers={}, json={}) - assert 401 == response.status_code + assert HTTP_401_UNAUTHORIZED == response.status_code def test_create_user_wrong_scope(self, url, api_client, generate_auth_header): auth_header = generate_auth_header([STORAGE_READ]) response = api_client.post(url, headers=auth_header, json={}) - assert 403 == response.status_code + assert HTTP_403_FORBIDDEN == response.status_code def test_create_user_bad_username( self, @@ -53,7 +62,7 @@ def test_create_user_bad_username( body = {"username": "spaces in name", "password": "TestP@ssword9"} response = api_client.post(url, headers=auth_header, json=body) - assert 422 == response.status_code + assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code def test_username_exists( self, @@ -70,7 +79,7 @@ def test_username_exists( response = api_client.post(url, headers=auth_header, json=body) response_body = json.loads(response.text) assert response_body["detail"] == "Username already exists." - assert 400 == response.status_code + assert HTTP_400_BAD_REQUEST == response.status_code user.delete(db) @@ -85,7 +94,7 @@ def test_create_user_bad_password( body = {"username": "test_user", "password": "short"} response = api_client.post(url, headers=auth_header, json=body) - assert 422 == response.status_code + assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code assert ( json.loads(response.text)["detail"][0]["msg"] == "Password must have at least eight characters." @@ -93,7 +102,7 @@ def test_create_user_bad_password( body = {"username": "test_user", "password": "longerpassword"} response = api_client.post(url, headers=auth_header, json=body) - assert 422 == response.status_code + assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code assert ( json.loads(response.text)["detail"][0]["msg"] == "Password must have at least one number." @@ -101,7 +110,7 @@ def test_create_user_bad_password( body = {"username": "test_user", "password": "longer55password"} response = api_client.post(url, headers=auth_header, json=body) - assert 422 == response.status_code + assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code assert ( json.loads(response.text)["detail"][0]["msg"] == "Password must have at least one capital letter." @@ -109,7 +118,7 @@ def test_create_user_bad_password( body = {"username": "test_user", "password": "LoNgEr55paSSworD"} response = api_client.post(url, headers=auth_header, json=body) - assert 422 == response.status_code + assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code assert ( json.loads(response.text)["detail"][0]["msg"] == "Password must have at least one symbol." @@ -129,7 +138,7 @@ def test_create_user( user = FidesopsUser.get_by(db, field="username", value=body["username"]) response_body = json.loads(response.text) assert response_body == {"id": user.id} - assert 201 == response.status_code + assert HTTP_201_CREATED == response.status_code user.delete(db) @@ -141,25 +150,25 @@ def url(self, oauth_client: ClientDetail, user) -> str: def test_delete_user_not_authenticated(self, url, api_client): response = api_client.delete(url, headers={}) - assert 401 == response.status_code + assert HTTP_401_UNAUTHORIZED == response.status_code def test_create_user_wrong_scope(self, url, api_client, generate_auth_header, db): auth_header = generate_auth_header([STORAGE_READ]) response = api_client.delete(url, headers=auth_header) - assert 403 == response.status_code + assert HTTP_403_FORBIDDEN == response.status_code def test_delete_user_not_admin_root_or_self( self, url, api_client, db, generate_auth_header, user ): auth_header = generate_auth_header([USER_DELETE]) response = api_client.delete(url, headers=auth_header) - assert 403 == response.status_code + assert HTTP_403_FORBIDDEN == response.status_code def test_delete_nonexistent_user(self, api_client, db, generate_auth_header, user): auth_header = generate_auth_header([USER_DELETE]) url = f"{V1_URL_PREFIX}{USERS}/nonexistent_user" response = api_client.delete(url, headers=auth_header) - assert 404 == response.status_code + assert HTTP_404_NOT_FOUND == response.status_code def test_delete_self(self, api_client, db, generate_auth_header): user = FidesopsUser.create( @@ -187,7 +196,7 @@ def test_delete_self(self, api_client, db, generate_auth_header): response = api_client.delete( f"{V1_URL_PREFIX}{USERS}/{user.id}", headers=auth_header ) - assert 204 == response.status_code + assert HTTP_204_NO_CONTENT == response.status_code db.expunge_all() @@ -227,7 +236,7 @@ def test_delete_user_as_root(self, api_client, db, generate_auth_header, user): response = api_client.delete( f"{V1_URL_PREFIX}{USERS}/{other_user.id}", headers=auth_header ) - assert 204 == response.status_code + assert HTTP_204_NO_CONTENT == response.status_code db.expunge_all() @@ -251,19 +260,19 @@ def url(self, oauth_client: ClientDetail) -> str: def test_get_users_not_authenticated(self, api_client: TestClient, url: str)-> None: resp = api_client.get(url, headers={}) - assert resp.status_code == 401 + assert resp.status_code == HTTP_401_UNAUTHORIZED def test_get_users_wrong_scope(self, api_client: TestClient, generate_auth_header, url): auth_header = generate_auth_header(scopes=[USER_DELETE]) resp = api_client.get(url, headers=auth_header) - assert resp.status_code == 403 + assert resp.status_code == HTTP_403_FORBIDDEN def test_get_users_no_users( self, api_client: TestClient, generate_auth_header, url ) -> None: auth_header = generate_auth_header(scopes=[USER_READ]) resp = api_client.get(url, headers=auth_header) - assert resp.status_code == 200 + assert resp.status_code == HTTP_200_OK response_body = json.loads(resp.text) assert len(response_body["items"]) == 0 assert response_body["total"] == 0 @@ -271,6 +280,73 @@ def test_get_users_no_users( assert response_body["size"] == page_size + def test_get_users(self, api_client:TestClient, generate_auth_header, url, db): + create_auth_header = generate_auth_header(scopes=[USER_CREATE]) + saved_users: List[FidesopsUser] = [] + total_users = 25 + for i in range(total_users): + body = {"username": f"user{i}@example.com", "password": "Password123!"} + resp = api_client.post(url, headers=create_auth_header, json=body) + assert resp.status_code == HTTP_201_CREATED + user = FidesopsUser.get_by(db, field="username", value=body["username"]) + saved_users.append(user) + + get_auth_header = generate_auth_header(scopes=[USER_READ]) + resp = api_client.get(url, headers=get_auth_header) + assert resp.status_code == HTTP_200_OK + response_body = json.loads(resp.text) + assert len(response_body["items"]) == total_users + assert response_body["total"] == total_users + assert response_body["page"] == 1 + assert response_body["size"] == page_size + + for i in range(total_users): + saved_users[i].delete(db) + + + def test_get_filtered_users(self, api_client:TestClient, generate_auth_header, url, db): + create_auth_header = generate_auth_header(scopes=[USER_CREATE]) + saved_users: List[FidesopsUser] = [] + total_users = 50 + for i in range(total_users): + body = {"username": f"user{i}@example.com", "password": "Password123!"} + resp = api_client.post(url, headers=create_auth_header, json=body) + assert resp.status_code == HTTP_201_CREATED + user = FidesopsUser.get_by(db, field="username", value=body["username"]) + saved_users.append(user) + + get_auth_header = generate_auth_header(scopes=[USER_READ]) + + resp = api_client.get(f"{url}?username={15}", headers=get_auth_header) + assert resp.status_code == HTTP_200_OK + response_body = json.loads(resp.text) + assert len(response_body["items"]) == 1 + assert response_body["total"] == 1 + assert response_body["page"] == 1 + assert response_body["size"] == page_size + + resp = api_client.get(f"{url}?username={5}", headers=get_auth_header) + assert resp.status_code == HTTP_200_OK + response_body = json.loads(resp.text) + assert len(response_body["items"]) == 5 + assert response_body["total"] == 5 + assert response_body["page"] == 1 + assert response_body["size"] == page_size + + resp = api_client.get(f"{url}?username=not real user", headers=get_auth_header) + assert resp.status_code == HTTP_200_OK + response_body = json.loads(resp.text) + assert len(response_body["items"]) == 0 + assert response_body["total"] == 0 + assert response_body["page"] == 1 + assert response_body["size"] == page_size + + + for i in range(total_users): + saved_users[i].delete(db) + + + class TestGetUser: @pytest.fixture(scope="function") def url(self, oauth_client: ClientDetail) -> str: @@ -282,12 +358,12 @@ def url_no_id(self, oauth_client: ClientDetail) -> str: def test_get_user_not_authenticated(self, api_client: TestClient, url: str) -> None: resp = api_client.get(url, headers={}) - assert resp.status_code == 401 + assert resp.status_code == HTTP_401_UNAUTHORIZED def test_get_user_wrong_scope(self, api_client: TestClient, generate_auth_header, url: str): auth_header = generate_auth_header(scopes=[USER_DELETE]) resp = api_client.get(url, headers=auth_header) - assert resp.status_code == 403 + assert resp.status_code == HTTP_403_FORBIDDEN def test_get_user_does_not_exist( self, api_client: TestClient, generate_auth_header, url_no_id: str @@ -297,7 +373,7 @@ def test_get_user_does_not_exist( f"{url_no_id}/this_is_a_nonexistent_key", headers=auth_header, ) - assert resp.status_code == 404 + assert resp.status_code == HTTP_404_NOT_FOUND class TestUserLogin: @pytest.fixture(scope="function") @@ -307,12 +383,12 @@ def url(self, oauth_client: ClientDetail) -> str: def test_user_does_not_exist(self, db, url, api_client): body = {"username": "does not exist", "password": "idonotknowmypassword"} response = api_client.post(url, headers={}, json=body) - assert response.status_code == 404 + assert response.status_code == HTTP_404_NOT_FOUND def test_bad_login(self, db, url, user, api_client): body = {"username": user.username, "password": "idonotknowmypassword"} response = api_client.post(url, headers={}, json=body) - assert response.status_code == 403 + assert response.status_code == HTTP_403_FORBIDDEN def test_login_creates_client(self, db, url, user, api_client): # Delete existing client for test purposes @@ -322,7 +398,7 @@ def test_login_creates_client(self, db, url, user, api_client): assert user.client is None # client does not exist response = api_client.post(url, headers={}, json=body) - assert response.status_code == 200 + assert response.status_code == HTTP_200_OK db.refresh(user) assert user.client is not None @@ -340,7 +416,7 @@ def test_login_updates_last_login_date(self, db, url, user, api_client): body = {"username": user.username, "password": "TESTdcnG@wzJeu0&%3Qe2fGo7"} response = api_client.post(url, headers={}, json=body) - assert response.status_code == 200 + assert response.status_code == HTTP_200_OK db.refresh(user) assert user.last_login_at is not None @@ -352,7 +428,7 @@ def test_login_uses_existing_client(self, db, url, user, api_client): user.client.scopes = [PRIVACY_REQUEST_READ] user.client.save(db) response = api_client.post(url, headers={}, json=body) - assert response.status_code == 200 + assert response.status_code == HTTP_200_OK db.refresh(user) assert user.client is not None @@ -384,7 +460,7 @@ def test_user_not_deleted_on_logout(self, db, url, api_client, user): } auth_header = {"Authorization": "Bearer " + generate_jwe(json.dumps(payload))} response = api_client.post(url, headers=auth_header, json={}) - assert response.status_code == 204 + assert response.status_code == HTTP_204_NO_CONTENT # Verify client was deleted client_search = ClientDetail.get_by(db, field="id", value=client_id) @@ -407,14 +483,14 @@ def test_user_not_deleted_on_logout(self, db, url, api_client, user): } auth_header = {"Authorization": "Bearer " + generate_jwe(json.dumps(payload))} response = api_client.post(url, headers=auth_header, json={}) - assert 403 == response.status_code + assert HTTP_403_FORBIDDEN == response.status_code def test_logout(self, db, url, api_client, generate_auth_header, oauth_client): oauth_client_id = oauth_client.id auth_header = generate_auth_header([STORAGE_READ]) response = api_client.post(url, headers=auth_header, json={}) - assert 204 == response.status_code + assert HTTP_204_NO_CONTENT == response.status_code # Verify client was deleted client_search = ClientDetail.get_by(db, field="id", value=oauth_client_id) @@ -422,4 +498,4 @@ def test_logout(self, db, url, api_client, generate_auth_header, oauth_client): # Gets AuthorizationError - client does not exist, this token can't be used anymore response = api_client.post(url, headers=auth_header, json={}) - assert response.status_code == 403 + assert response.status_code == HTTP_403_FORBIDDEN