Skip to content

Commit

Permalink
437 user search (ethyca#444)
Browse files Browse the repository at this point in the history
* Add user search

* Add user search tests and remove magic numbers
  • Loading branch information
TheAndrewJackson authored and Adam Sachs committed May 17, 2022
1 parent fedca99 commit 04497e0
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 36 deletions.
15 changes: 11 additions & 4 deletions src/fidesops/api/v1/endpoints/user_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, Security
from fastapi_pagination.ext.sqlalchemy import paginate
Expand All @@ -9,6 +10,7 @@
from datetime import datetime

from sqlalchemy.orm import Session
from sqlalchemy_utils import escape_like
from starlette.status import (
HTTP_200_OK,
HTTP_201_CREATED,
Expand Down Expand Up @@ -89,13 +91,18 @@ def create_user(
response_model=Page[UserResponse],
)
def get_users(
*, db: Session = Depends(deps.get_db), params: Params = Depends()
*,
db: Session = Depends(deps.get_db),
params: Params = Depends(),
username: Optional[str] = None,
) -> AbstractPage[FidesopsUser]:
"""Returns a paginated list of all users"""
logger.info(f"Returned a paginated list of all users.")
return paginate(
FidesopsUser.query(db).order_by(FidesopsUser.created_at.desc()), params=params
)
query = FidesopsUser.query(db)
if username:
query = query.filter(FidesopsUser.username.ilike(f"%{escape_like(username)}%"))

return paginate(query.order_by(FidesopsUser.created_at.desc()), params=params)


@router.get(
Expand Down
140 changes: 108 additions & 32 deletions tests/api/v1/endpoints/test_user_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
from datetime import datetime

from typing import List
import json

import pytest

from fastapi_pagination import Params
from starlette.testclient import TestClient

from starlette.status import (
HTTP_200_OK,
HTTP_201_CREATED,
HTTP_204_NO_CONTENT,
HTTP_403_FORBIDDEN,
HTTP_401_UNAUTHORIZED,
HTTP_400_BAD_REQUEST,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_404_NOT_FOUND
)

from fidesops.api.v1.urn_registry import V1_URL_PREFIX, USERS, LOGIN, LOGOUT, USER_DETAIL
from fidesops.models.client import ClientDetail, ADMIN_UI_ROOT
Expand Down Expand Up @@ -35,12 +44,12 @@ def url(self, oauth_client: ClientDetail) -> str:

def test_create_user_not_authenticated(self, url, api_client):
response = api_client.post(url, headers={}, json={})
assert 401 == response.status_code
assert HTTP_401_UNAUTHORIZED == response.status_code

def test_create_user_wrong_scope(self, url, api_client, generate_auth_header):
auth_header = generate_auth_header([STORAGE_READ])
response = api_client.post(url, headers=auth_header, json={})
assert 403 == response.status_code
assert HTTP_403_FORBIDDEN == response.status_code

def test_create_user_bad_username(
self,
Expand All @@ -53,7 +62,7 @@ def test_create_user_bad_username(
body = {"username": "spaces in name", "password": "TestP@ssword9"}

response = api_client.post(url, headers=auth_header, json=body)
assert 422 == response.status_code
assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code

def test_username_exists(
self,
Expand All @@ -70,7 +79,7 @@ def test_username_exists(
response = api_client.post(url, headers=auth_header, json=body)
response_body = json.loads(response.text)
assert response_body["detail"] == "Username already exists."
assert 400 == response.status_code
assert HTTP_400_BAD_REQUEST == response.status_code

user.delete(db)

Expand All @@ -85,31 +94,31 @@ def test_create_user_bad_password(

body = {"username": "test_user", "password": "short"}
response = api_client.post(url, headers=auth_header, json=body)
assert 422 == response.status_code
assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code
assert (
json.loads(response.text)["detail"][0]["msg"]
== "Password must have at least eight characters."
)

body = {"username": "test_user", "password": "longerpassword"}
response = api_client.post(url, headers=auth_header, json=body)
assert 422 == response.status_code
assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code
assert (
json.loads(response.text)["detail"][0]["msg"]
== "Password must have at least one number."
)

body = {"username": "test_user", "password": "longer55password"}
response = api_client.post(url, headers=auth_header, json=body)
assert 422 == response.status_code
assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code
assert (
json.loads(response.text)["detail"][0]["msg"]
== "Password must have at least one capital letter."
)

body = {"username": "test_user", "password": "LoNgEr55paSSworD"}
response = api_client.post(url, headers=auth_header, json=body)
assert 422 == response.status_code
assert HTTP_422_UNPROCESSABLE_ENTITY == response.status_code
assert (
json.loads(response.text)["detail"][0]["msg"]
== "Password must have at least one symbol."
Expand All @@ -129,7 +138,7 @@ def test_create_user(
user = FidesopsUser.get_by(db, field="username", value=body["username"])
response_body = json.loads(response.text)
assert response_body == {"id": user.id}
assert 201 == response.status_code
assert HTTP_201_CREATED == response.status_code

user.delete(db)

Expand All @@ -141,25 +150,25 @@ def url(self, oauth_client: ClientDetail, user) -> str:

def test_delete_user_not_authenticated(self, url, api_client):
response = api_client.delete(url, headers={})
assert 401 == response.status_code
assert HTTP_401_UNAUTHORIZED == response.status_code

def test_create_user_wrong_scope(self, url, api_client, generate_auth_header, db):
auth_header = generate_auth_header([STORAGE_READ])
response = api_client.delete(url, headers=auth_header)
assert 403 == response.status_code
assert HTTP_403_FORBIDDEN == response.status_code

def test_delete_user_not_admin_root_or_self(
self, url, api_client, db, generate_auth_header, user
):
auth_header = generate_auth_header([USER_DELETE])
response = api_client.delete(url, headers=auth_header)
assert 403 == response.status_code
assert HTTP_403_FORBIDDEN == response.status_code

def test_delete_nonexistent_user(self, api_client, db, generate_auth_header, user):
auth_header = generate_auth_header([USER_DELETE])
url = f"{V1_URL_PREFIX}{USERS}/nonexistent_user"
response = api_client.delete(url, headers=auth_header)
assert 404 == response.status_code
assert HTTP_404_NOT_FOUND == response.status_code

def test_delete_self(self, api_client, db, generate_auth_header):
user = FidesopsUser.create(
Expand Down Expand Up @@ -187,7 +196,7 @@ def test_delete_self(self, api_client, db, generate_auth_header):
response = api_client.delete(
f"{V1_URL_PREFIX}{USERS}/{user.id}", headers=auth_header
)
assert 204 == response.status_code
assert HTTP_204_NO_CONTENT == response.status_code

db.expunge_all()

Expand Down Expand Up @@ -227,7 +236,7 @@ def test_delete_user_as_root(self, api_client, db, generate_auth_header, user):
response = api_client.delete(
f"{V1_URL_PREFIX}{USERS}/{other_user.id}", headers=auth_header
)
assert 204 == response.status_code
assert HTTP_204_NO_CONTENT == response.status_code

db.expunge_all()

Expand All @@ -251,26 +260,93 @@ def url(self, oauth_client: ClientDetail) -> str:

def test_get_users_not_authenticated(self, api_client: TestClient, url: str)-> None:
resp = api_client.get(url, headers={})
assert resp.status_code == 401
assert resp.status_code == HTTP_401_UNAUTHORIZED

def test_get_users_wrong_scope(self, api_client: TestClient, generate_auth_header, url):
auth_header = generate_auth_header(scopes=[USER_DELETE])
resp = api_client.get(url, headers=auth_header)
assert resp.status_code == 403
assert resp.status_code == HTTP_403_FORBIDDEN

def test_get_users_no_users(
self, api_client: TestClient, generate_auth_header, url
) -> None:
auth_header = generate_auth_header(scopes=[USER_READ])
resp = api_client.get(url, headers=auth_header)
assert resp.status_code == 200
assert resp.status_code == HTTP_200_OK
response_body = json.loads(resp.text)
assert len(response_body["items"]) == 0
assert response_body["total"] == 0
assert response_body["page"] == 1
assert response_body["size"] == page_size


def test_get_users(self, api_client:TestClient, generate_auth_header, url, db):
create_auth_header = generate_auth_header(scopes=[USER_CREATE])
saved_users: List[FidesopsUser] = []
total_users = 25
for i in range(total_users):
body = {"username": f"user{i}@example.com", "password": "Password123!"}
resp = api_client.post(url, headers=create_auth_header, json=body)
assert resp.status_code == HTTP_201_CREATED
user = FidesopsUser.get_by(db, field="username", value=body["username"])
saved_users.append(user)

get_auth_header = generate_auth_header(scopes=[USER_READ])
resp = api_client.get(url, headers=get_auth_header)
assert resp.status_code == HTTP_200_OK
response_body = json.loads(resp.text)
assert len(response_body["items"]) == total_users
assert response_body["total"] == total_users
assert response_body["page"] == 1
assert response_body["size"] == page_size

for i in range(total_users):
saved_users[i].delete(db)


def test_get_filtered_users(self, api_client:TestClient, generate_auth_header, url, db):
create_auth_header = generate_auth_header(scopes=[USER_CREATE])
saved_users: List[FidesopsUser] = []
total_users = 50
for i in range(total_users):
body = {"username": f"user{i}@example.com", "password": "Password123!"}
resp = api_client.post(url, headers=create_auth_header, json=body)
assert resp.status_code == HTTP_201_CREATED
user = FidesopsUser.get_by(db, field="username", value=body["username"])
saved_users.append(user)

get_auth_header = generate_auth_header(scopes=[USER_READ])

resp = api_client.get(f"{url}?username={15}", headers=get_auth_header)
assert resp.status_code == HTTP_200_OK
response_body = json.loads(resp.text)
assert len(response_body["items"]) == 1
assert response_body["total"] == 1
assert response_body["page"] == 1
assert response_body["size"] == page_size

resp = api_client.get(f"{url}?username={5}", headers=get_auth_header)
assert resp.status_code == HTTP_200_OK
response_body = json.loads(resp.text)
assert len(response_body["items"]) == 5
assert response_body["total"] == 5
assert response_body["page"] == 1
assert response_body["size"] == page_size

resp = api_client.get(f"{url}?username=not real user", headers=get_auth_header)
assert resp.status_code == HTTP_200_OK
response_body = json.loads(resp.text)
assert len(response_body["items"]) == 0
assert response_body["total"] == 0
assert response_body["page"] == 1
assert response_body["size"] == page_size


for i in range(total_users):
saved_users[i].delete(db)



class TestGetUser:
@pytest.fixture(scope="function")
def url(self, oauth_client: ClientDetail) -> str:
Expand All @@ -282,12 +358,12 @@ def url_no_id(self, oauth_client: ClientDetail) -> str:

def test_get_user_not_authenticated(self, api_client: TestClient, url: str) -> None:
resp = api_client.get(url, headers={})
assert resp.status_code == 401
assert resp.status_code == HTTP_401_UNAUTHORIZED

def test_get_user_wrong_scope(self, api_client: TestClient, generate_auth_header, url: str):
auth_header = generate_auth_header(scopes=[USER_DELETE])
resp = api_client.get(url, headers=auth_header)
assert resp.status_code == 403
assert resp.status_code == HTTP_403_FORBIDDEN

def test_get_user_does_not_exist(
self, api_client: TestClient, generate_auth_header, url_no_id: str
Expand All @@ -297,7 +373,7 @@ def test_get_user_does_not_exist(
f"{url_no_id}/this_is_a_nonexistent_key",
headers=auth_header,
)
assert resp.status_code == 404
assert resp.status_code == HTTP_404_NOT_FOUND

class TestUserLogin:
@pytest.fixture(scope="function")
Expand All @@ -307,12 +383,12 @@ def url(self, oauth_client: ClientDetail) -> str:
def test_user_does_not_exist(self, db, url, api_client):
body = {"username": "does not exist", "password": "idonotknowmypassword"}
response = api_client.post(url, headers={}, json=body)
assert response.status_code == 404
assert response.status_code == HTTP_404_NOT_FOUND

def test_bad_login(self, db, url, user, api_client):
body = {"username": user.username, "password": "idonotknowmypassword"}
response = api_client.post(url, headers={}, json=body)
assert response.status_code == 403
assert response.status_code == HTTP_403_FORBIDDEN

def test_login_creates_client(self, db, url, user, api_client):
# Delete existing client for test purposes
Expand All @@ -322,7 +398,7 @@ def test_login_creates_client(self, db, url, user, api_client):
assert user.client is None # client does not exist

response = api_client.post(url, headers={}, json=body)
assert response.status_code == 200
assert response.status_code == HTTP_200_OK

db.refresh(user)
assert user.client is not None
Expand All @@ -340,7 +416,7 @@ def test_login_updates_last_login_date(self, db, url, user, api_client):
body = {"username": user.username, "password": "TESTdcnG@wzJeu0&%3Qe2fGo7"}

response = api_client.post(url, headers={}, json=body)
assert response.status_code == 200
assert response.status_code == HTTP_200_OK

db.refresh(user)
assert user.last_login_at is not None
Expand All @@ -352,7 +428,7 @@ def test_login_uses_existing_client(self, db, url, user, api_client):
user.client.scopes = [PRIVACY_REQUEST_READ]
user.client.save(db)
response = api_client.post(url, headers={}, json=body)
assert response.status_code == 200
assert response.status_code == HTTP_200_OK

db.refresh(user)
assert user.client is not None
Expand Down Expand Up @@ -384,7 +460,7 @@ def test_user_not_deleted_on_logout(self, db, url, api_client, user):
}
auth_header = {"Authorization": "Bearer " + generate_jwe(json.dumps(payload))}
response = api_client.post(url, headers=auth_header, json={})
assert response.status_code == 204
assert response.status_code == HTTP_204_NO_CONTENT

# Verify client was deleted
client_search = ClientDetail.get_by(db, field="id", value=client_id)
Expand All @@ -407,19 +483,19 @@ def test_user_not_deleted_on_logout(self, db, url, api_client, user):
}
auth_header = {"Authorization": "Bearer " + generate_jwe(json.dumps(payload))}
response = api_client.post(url, headers=auth_header, json={})
assert 403 == response.status_code
assert HTTP_403_FORBIDDEN == response.status_code


def test_logout(self, db, url, api_client, generate_auth_header, oauth_client):
oauth_client_id = oauth_client.id
auth_header = generate_auth_header([STORAGE_READ])
response = api_client.post(url, headers=auth_header, json={})
assert 204 == response.status_code
assert HTTP_204_NO_CONTENT == response.status_code

# Verify client was deleted
client_search = ClientDetail.get_by(db, field="id", value=oauth_client_id)
assert client_search is None

# Gets AuthorizationError - client does not exist, this token can't be used anymore
response = api_client.post(url, headers=auth_header, json={})
assert response.status_code == 403
assert response.status_code == HTTP_403_FORBIDDEN

0 comments on commit 04497e0

Please sign in to comment.