diff --git a/.gitignore b/.gitignore index 5ae51ec915..505e771b3e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,5 @@ # Source for the following rules: https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -# Files to keep that would otherwise get ignored - # frontend ui-build/ diff --git a/CHANGELOG.md b/CHANGELOG.md index b65ac58089..d36f5fbfe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The types of changes are: ### Added +* Added the `user` command group to the CLI. [#2153](https://github.com/ethyca/fides/pull/2153) * Added the connection key to the execution log [#2100](https://github.com/ethyca/fides/pull/2100) * Added endpoints to retrieve DSR `Rule`s and `Rule Target`s [#2116](https://github.com/ethyca/fides/pull/2116) * Dataset classification UI now polls for results [#2123](https://github.com/ethyca/fides/pull/2123) @@ -138,7 +139,6 @@ The types of changes are: * Remove duplicate fastapi-caching and pin version. [#1765](https://github.com/ethyca/fides/pull/1765) - ## [2.2.0](https://github.com/ethyca/fides/compare/2.1.0...2.2.0) ### Added @@ -221,7 +221,6 @@ The types of changes are: * Bumped versions of packages that use OpenSSL [#1683](https://github.com/ethyca/fides/pull/1683) - ## [2.0.0](https://github.com/ethyca/fides/compare/1.9.6...2.0.0) ### Added diff --git a/pyproject.toml b/pyproject.toml index 624e5f7492..9b12cae301 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,10 +155,8 @@ log_cli=false filterwarnings = "ignore::DeprecationWarning:aiofiles.*:" testpaths="tests" log_level = "INFO" -addopts = ["--cov=fides.core", - "--cov=fides.cli", - "--cov=fides.api", - "--cov-report=term-missing", +addopts = ["--cov=fides", + "-ra", "-vv", "--no-cov-on-fail", "--disable-pytest-warnings"] diff --git a/src/fides/cli/__init__.py b/src/fides/cli/__init__.py index 255e2ada7f..4a383e8af8 100644 --- a/src/fides/cli/__init__.py +++ b/src/fides/cli/__init__.py @@ -17,6 +17,7 @@ from .commands.export import export from .commands.generate import generate from .commands.scan import scan +from .commands.user import user from .commands.util import deploy, init, status, webserver, worker from .commands.view import view @@ -36,6 +37,7 @@ pull, push, worker, + user, ] API_COMMAND_DICT = {command.name or str(command): command for command in API_COMMANDS} ALL_COMMANDS = API_COMMANDS + LOCAL_COMMANDS diff --git a/src/fides/cli/commands/user.py b/src/fides/cli/commands/user.py new file mode 100644 index 0000000000..cae4f809e4 --- /dev/null +++ b/src/fides/cli/commands/user.py @@ -0,0 +1,65 @@ +"""Contains the user command group for the fides CLI.""" +import click + +from fides.cli.options import ( + first_name_option, + last_name_option, + password_option, + username_option, +) +from fides.core.user import create_command, get_permissions_command, login_command + + +@click.group(name="user") +@click.pass_context +def user(ctx: click.Context) -> None: + """ + Click command group for interacting with user-related functionality. + """ + + +@user.command() +@click.pass_context +@username_option +@password_option +@first_name_option +@last_name_option +def create( + ctx: click.Context, username: str, password: str, first_name: str, last_name: str +) -> None: + """ + Use credentials from the credentials file to create a new user. + + Gives full permissions to the new user. + """ + config = ctx.obj["CONFIG"] + server_url = config.cli.server_url + create_command( + username=username, + password=password, + first_name=first_name, + last_name=last_name, + server_url=server_url, + ) + + +@user.command() +@click.pass_context +@username_option +@password_option +def login(ctx: click.Context, username: str, password: str) -> None: + """ + Use credentials to get a user access token and write it to a credentials file. + """ + config = ctx.obj["CONFIG"] + server_url = config.cli.server_url + login_command(username=username, password=password, server_url=server_url) + + +@user.command(name="permissions") +@click.pass_context +def get_permissions(ctx: click.Context) -> None: + """List the scopes avaible to the current user.""" + config = ctx.obj["CONFIG"] + server_url = config.cli.server_url + get_permissions_command(server_url=server_url) diff --git a/src/fides/cli/options.py b/src/fides/cli/options.py index 7c28c4ff85..5700f3ec83 100644 --- a/src/fides/cli/options.py +++ b/src/fides/cli/options.py @@ -185,3 +185,53 @@ def aws_region_option(command: Callable) -> Callable: help="Use region option to connect to aws. Requires options --access_key_id, --secret_access_key and --region", )(command) return command + + +def prompt_username(ctx: click.Context, param: str, value: str) -> str: + if not value: + value = click.prompt(text="Username") + return value + + +def prompt_password(ctx: click.Context, param: str, value: str) -> str: + if not value: + value = click.prompt(text="Password", hide_input=True) + return value + + +def username_option(command: Callable) -> Callable: + command = click.option( + "-u", + "--username", + default="", + callback=prompt_username, + )(command) + return command + + +def password_option(command: Callable) -> Callable: + command = click.option( + "-p", + "--password", + default="", + callback=prompt_password, + )(command) + return command + + +def first_name_option(command: Callable) -> Callable: + command = click.option( + "-f", + "--first-name", + default="", + )(command) + return command + + +def last_name_option(command: Callable) -> Callable: + command = click.option( + "-l", + "--last-name", + default="", + )(command) + return command diff --git a/src/fides/core/user.py b/src/fides/core/user.py new file mode 100644 index 0000000000..df7b11ac0a --- /dev/null +++ b/src/fides/core/user.py @@ -0,0 +1,202 @@ +"""Module for interaction with User endpoints/commands.""" +import json +from os import getenv +from pathlib import Path +from typing import Dict, List, Tuple + +import requests +import toml +from pydantic import BaseModel + +from fides.cli.utils import handle_cli_response +from fides.core.config import get_config +from fides.core.utils import echo_green, echo_red +from fides.lib.cryptography.cryptographic_util import str_to_b64_str +from fides.lib.oauth.scopes import SCOPES + +config = get_config() +CREATE_USER_PATH = "/api/v1/user" +LOGIN_PATH = "/api/v1/login" +USER_PERMISSIONS_PATH = "/api/v1/user/{}/permission" + + +class Credentials(BaseModel): + """ + User credentials for the CLI. + """ + + username: str + password: str + user_id: str + access_token: str + + +def get_credentials_path() -> str: + """Returns the default credentials path or the path set as an environment variable.""" + default_credentials_file_path = f"{str(Path.home())}/.fides_credentials" + credentials_path = getenv("FIDES_CREDENTIALS_PATH", default_credentials_file_path) + return credentials_path + + +def get_access_token(username: str, password: str, server_url: str) -> Tuple[str, str]: + """ + Get a user access token from the webserver. + """ + payload = { + "username": username, + "password": str_to_b64_str(password), + } + + response = requests.post(server_url + LOGIN_PATH, json=payload) + handle_cli_response(response, verbose=False) + user_id: str = response.json()["user_data"]["id"] + access_token: str = response.json()["token_data"]["access_token"] + return (user_id, access_token) + + +def write_credentials_file( + credentials: Credentials, credentials_path: str = get_credentials_path() +) -> str: + """ + Write the user credentials file. + """ + with open(credentials_path, "w", encoding="utf-8") as credentials_file: + credentials_file.write(toml.dumps(credentials.dict())) + return credentials_path + + +def read_credentials_file( + credentials_path: str = get_credentials_path(), +) -> Credentials: + """Read and return the credentials file.""" + with open(credentials_path, "r", encoding="utf-8") as credentials_file: + credentials = Credentials.parse_obj(toml.load(credentials_file)) + return credentials + + +def create_auth_header(access_token: str) -> Dict[str, str]: + """Given an access token, create an auth header.""" + auth_header = { + "Authorization": f"Bearer {access_token}", + } + return auth_header + + +def create_user( + username: str, + password: str, + first_name: str, + last_name: str, + auth_header: Dict[str, str], + server_url: str, +) -> requests.Response: + """Create a user.""" + request_data = { + "username": username, + "password": str_to_b64_str(password), + "first_name": first_name, + "last_name": last_name, + } + response = requests.post( + server_url + CREATE_USER_PATH, + headers=auth_header, + data=json.dumps(request_data), + ) + handle_cli_response(response, verbose=False) + return response + + +def get_user_permissions( + user_id: str, auth_header: Dict[str, str], server_url: str +) -> List[str]: + """ + List all of the user permissions for the provided user. + """ + get_permissions_path = USER_PERMISSIONS_PATH.format(user_id) + response = requests.get( + server_url + get_permissions_path, + headers=auth_header, + ) + + handle_cli_response(response, verbose=False) + return response.json()["scopes"] + + +def update_user_permissions( + user_id: str, scopes: List[str], auth_header: Dict[str, str], server_url: str +) -> requests.Response: + """ + Update user permissions for a given user. + """ + request_data = {"scopes": scopes, "id": user_id} + set_permissions_path = USER_PERMISSIONS_PATH.format(user_id) + response = requests.put( + server_url + set_permissions_path, + headers=auth_header, + json=request_data, + ) + handle_cli_response(response, verbose=False) + return response + + +def create_command( + username: str, password: str, first_name: str, last_name: str, server_url: str +) -> None: + """ + Given new user information, create a new user via the API using + the local credentials file. + """ + + try: + credentials = read_credentials_file() + except FileNotFoundError: + echo_red("No credentials file found.") + raise SystemExit(1) + + access_token = credentials.access_token + auth_header = create_auth_header(access_token) + user_response = create_user( + username=username, + password=password, + first_name=first_name, + last_name=last_name, + auth_header=auth_header, + server_url=server_url, + ) + user_id = user_response.json()["id"] + update_user_permissions( + user_id=user_id, scopes=SCOPES, auth_header=auth_header, server_url=server_url + ) + echo_green(f"User: '{username}' created and assigned permissions.") + + +def login_command(username: str, password: str, server_url: str) -> None: + """ + Given a username and password, request an access_token from the API and + store all user information in a local credentials file. + """ + user_id, access_token = get_access_token( + username=username, password=password, server_url=server_url + ) + echo_green(f"Logged in as user: {username}") + credentials = Credentials( + username=username, password=password, user_id=user_id, access_token=access_token + ) + credentials_path = write_credentials_file(credentials) + echo_green(f"Credentials file written to: {credentials_path}") + + +def get_permissions_command(server_url: str) -> None: + """ + Get user permissions from the API. + """ + credentials = read_credentials_file() + user_id = credentials.user_id + access_token = credentials.access_token + + auth_header = create_auth_header(access_token) + permissions: List[str] = get_user_permissions(user_id, auth_header, server_url) + + print("Permissions:") + for permission in permissions: + print(f"\t{permission}") diff --git a/tests/ctl/cli/test_cli.py b/tests/ctl/cli/test_cli.py index 26a4eb1464..c6673d5315 100644 --- a/tests/ctl/cli/test_cli.py +++ b/tests/ctl/cli/test_cli.py @@ -2,6 +2,7 @@ import os from base64 import b64decode from json import dump, loads +from pathlib import Path from typing import Generator import pytest @@ -65,54 +66,56 @@ def test_parse(test_config_path: str, test_cli_runner: CliRunner) -> None: assert result.exit_code == 0 -@pytest.mark.integration -def test_reset_db(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke(cli, ["-f", test_config_path, "db", "reset", "-y"]) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_init_db(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke(cli, ["-f", test_config_path, "db", "init"]) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_push(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke( - cli, ["-f", test_config_path, "push", "demo_resources/"] - ) - print(result.output) - assert result.exit_code == 0 +class TestDB: + @pytest.mark.integration + def test_reset_db(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke( + cli, ["-f", test_config_path, "db", "reset", "-y"] + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.integration + def test_init_db(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke(cli, ["-f", test_config_path, "db", "init"]) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_dry_push(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke( - cli, ["-f", test_config_path, "push", "--dry", "demo_resources/"] - ) - print(result.output) - assert result.exit_code == 0 +class TestPush: + @pytest.mark.integration + def test_push(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke( + cli, ["-f", test_config_path, "push", "demo_resources/"] + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_diff_push(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke( - cli, ["-f", test_config_path, "push", "--diff", "demo_resources/"] - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_dry_push(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke( + cli, ["-f", test_config_path, "push", "--dry", "demo_resources/"] + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.integration + def test_diff_push(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke( + cli, ["-f", test_config_path, "push", "--diff", "demo_resources/"] + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_dry_diff_push(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke( - cli, ["-f", test_config_path, "push", "--dry", "--diff", "demo_resources/"] - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_dry_diff_push( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, ["-f", test_config_path, "push", "--dry", "--diff", "demo_resources/"] + ) + print(result.output) + assert result.exit_code == 0 @pytest.mark.integration @@ -169,678 +172,747 @@ def test_audit(test_config_path: str, test_cli_runner: CliRunner) -> None: assert result.exit_code == 0 -@pytest.mark.integration -def test_get(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke( - cli, - ["-f", test_config_path, "get", "data_category", "user"], - ) - print(result.output) - assert result.exit_code == 0 +class TestCRUD: + @pytest.mark.integration + def test_get(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke( + cli, + ["-f", test_config_path, "get", "data_category", "user"], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.integration + def test_ls(self, test_config_path: str, test_cli_runner: CliRunner) -> None: + result = test_cli_runner.invoke(cli, ["-f", test_config_path, "ls", "system"]) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_ls(test_config_path: str, test_cli_runner: CliRunner) -> None: - result = test_cli_runner.invoke(cli, ["-f", test_config_path, "ls", "system"]) - print(result.output) - assert result.exit_code == 0 +class TestEvaluate: + @pytest.mark.integration + def test_evaluate_with_declaration_pass( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/passing_declaration_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_evaluate_with_declaration_pass( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "tests/ctl/data/passing_declaration_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_evaluate_demo_resources_pass( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + ["-f", test_config_path, "evaluate", "demo_resources/"], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.integration + def test_local_evaluate( + self, test_invalid_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "--local", + "-f", + test_invalid_config_path, + "evaluate", + "tests/ctl/data/passing_declaration_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_evaluate_demo_resources_pass( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - ["-f", test_config_path, "evaluate", "demo_resources/"], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_local_evaluate_demo_resources( + self, test_invalid_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "--local", + "-f", + test_invalid_config_path, + "evaluate", + "demo_resources/", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.integration + def test_evaluate_with_key_pass( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "-k", + "primary_privacy_policy", + "tests/ctl/data/passing_declaration_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_local_evaluate( - test_invalid_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "--local", - "-f", - test_invalid_config_path, - "evaluate", - "tests/ctl/data/passing_declaration_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_evaluate_with_declaration_failed( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/failing_declaration_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 1 + @pytest.mark.integration + def test_evaluate_with_dataset_failed( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/failing_dataset_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 1 -@pytest.mark.integration -def test_local_evaluate_demo_resources( - test_invalid_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "--local", - "-f", - test_invalid_config_path, - "evaluate", - "demo_resources/", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_evaluate_with_dataset_field_failed( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/failing_dataset_collection_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 1 + @pytest.mark.integration + def test_evaluate_with_dataset_collection_failed( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/failing_dataset_field_taxonomy.yml", + ], + ) + print(result.output) + assert result.exit_code == 1 -@pytest.mark.integration -def test_evaluate_with_key_pass( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "-k", - "primary_privacy_policy", - "tests/ctl/data/passing_declaration_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_evaluate_nested_field_fails( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + """ + Tests a taxonomy that is rigged to fail only due to + one of the nested fields violating the policy. Test + will fail if the nested field is not discovered. + """ + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "evaluate", + "tests/ctl/data/failing_nested_dataset.yml", + ], + ) + print(result.output) + assert result.exit_code == 1 @pytest.mark.integration -def test_evaluate_with_declaration_failed( - test_config_path: str, test_cli_runner: CliRunner +@pytest.mark.parametrize( + "export_resource", ["system", "dataset", "organization", "datamap"] +) +def test_export_resources( + test_config_path: str, + test_cli_runner: CliRunner, + export_resource: str, ) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "tests/ctl/data/failing_declaration_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 1 - + """ + Tests that each resource is successfully exported + """ -@pytest.mark.integration -def test_evaluate_with_dataset_failed( - test_config_path: str, test_cli_runner: CliRunner -) -> None: result = test_cli_runner.invoke( cli, [ "-f", test_config_path, - "evaluate", - "tests/ctl/data/failing_dataset_taxonomy.yml", + "export", + export_resource, + "--dry", ], ) - print(result.output) - assert result.exit_code == 1 - + assert result.exit_code == 0 -@pytest.mark.integration -def test_evaluate_with_dataset_field_failed( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "tests/ctl/data/failing_dataset_collection_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 1 +class TestScan: + @pytest.mark.integration + def test_scan_dataset_db_input_connection_string( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "dataset", + "db", + "--connection-string", + "postgresql+psycopg2://postgres:fides@fides-db:5432/fides_test", + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.integration -def test_evaluate_with_dataset_collection_failed( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "tests/ctl/data/failing_dataset_field_taxonomy.yml", - ], - ) - print(result.output) - assert result.exit_code == 1 - - -@pytest.mark.integration -@pytest.mark.parametrize( - "export_resource", ["system", "dataset", "organization", "datamap"] -) -def test_export_resources( - test_config_path: str, - test_cli_runner: CliRunner, - export_resource: str, -) -> None: - """ - Tests that each resource is successfully exported - """ - - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "export", - export_resource, - "--dry", - ], - ) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_nested_field_fails_evaluation( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - """ - Tests a taxonomy that is rigged to fail only due to - one of the nested fields violating the policy. Test - will fail if the nested field is not discovered. - """ - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "evaluate", - "tests/ctl/data/failing_nested_dataset.yml", - ], - ) - print(result.output) - assert result.exit_code == 1 - - -@pytest.mark.integration -def test_generate_dataset_db_with_connection_string( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("dataset.yml") - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "dataset", - "db", - f"{tmp_file}", - "--connection-string", - "postgresql+psycopg2://postgres:fides@fides-db:5432/fides_test", - ], - ) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_generate_dataset_db_with_credentials_id( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("dataset.yml") - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "dataset", - "db", - f"{tmp_file}", - "--credentials-id", - "postgres_1", - ], - ) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_scan_dataset_db_input_connection_string( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "dataset", - "db", - "--connection-string", - "postgresql+psycopg2://postgres:fides@fides-db:5432/fides_test", - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.integration -def test_scan_dataset_db_input_credentials_id( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "dataset", - "db", - "--credentials-id", - "postgres_1", - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 - - -@pytest.mark.external -def test_generate_system_aws_environment_credentials( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("system.yml") - result = test_cli_runner.invoke( - cli, - ["-f", test_config_path, "generate", "system", "aws", f"{tmp_file}"], - ) - print(result.output) - assert result.exit_code == 0 - + @pytest.mark.integration + def test_scan_dataset_db_input_credentials_id( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "dataset", + "db", + "--credentials-id", + "postgres_1", + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_scan_system_aws_environment_credentials( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "aws", - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_aws_environment_credentials( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "aws", + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_aws_input_credential_options( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "aws", + "--coverage-threshold", + "0", + "--access_key_id", + os.environ["AWS_ACCESS_KEY_ID"], + "--secret_access_key", + os.environ["AWS_SECRET_ACCESS_KEY"], + "--region", + os.environ["AWS_DEFAULT_REGION"], + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_generate_system_aws_input_credential_options( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("system.yml") - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "system", - "aws", - f"{tmp_file}", - "--access_key_id", - os.environ["AWS_ACCESS_KEY_ID"], - "--secret_access_key", - os.environ["AWS_SECRET_ACCESS_KEY"], - "--region", - os.environ["AWS_DEFAULT_REGION"], - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_aws_input_credentials_id( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + os.environ["FIDES__CREDENTIALS__AWS_1__AWS_ACCESS_KEY_ID"] = os.environ[ + "AWS_ACCESS_KEY_ID" + ] + os.environ["FIDES__CREDENTIALS__AWS_1__AWS_SECRET_ACCESS_KEY"] = os.environ[ + "AWS_SECRET_ACCESS_KEY" + ] + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "aws", + "--coverage-threshold", + "0", + "--credentials-id", + "aws_1", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_scan_system_aws_input_credential_options( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "aws", - "--coverage-threshold", - "0", - "--access_key_id", - os.environ["AWS_ACCESS_KEY_ID"], - "--secret_access_key", - os.environ["AWS_SECRET_ACCESS_KEY"], - "--region", - os.environ["AWS_DEFAULT_REGION"], - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_okta_input_credential_options( + self, test_config_path: str, test_cli_runner: CliRunner + ) -> None: + token = os.environ["OKTA_CLIENT_TOKEN"] + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "okta", + "--org-url", + OKTA_URL, + "--token", + token, + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_okta_input_credentials_id( + self, + test_config_path: str, + test_cli_runner: CliRunner, + ) -> None: + os.environ["FIDES__CREDENTIALS__OKTA_1__TOKEN"] = os.environ[ + "OKTA_CLIENT_TOKEN" + ] + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "okta", + "--credentials-id", + "okta_1", + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_generate_system_aws_input_credentials_id( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - os.environ["FIDES__CREDENTIALS__AWS_1__AWS_ACCESS_KEY_ID"] = os.environ[ - "AWS_ACCESS_KEY_ID" - ] - os.environ["FIDES__CREDENTIALS__AWS_1__AWS_SECRET_ACCESS_KEY"] = os.environ[ - "AWS_SECRET_ACCESS_KEY" - ] - tmp_file = tmpdir.join("system.yml") - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "system", - "aws", - f"{tmp_file}", - "--credentials-id", - "aws_1", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_scan_system_okta_environment_credentials( + self, + test_config_path: str, + test_cli_runner: CliRunner, + ) -> None: + os.environ["OKTA_CLIENT_ORGURL"] = OKTA_URL + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "scan", + "system", + "okta", + "--coverage-threshold", + "0", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_scan_system_aws_input_credentials_id( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - os.environ["FIDES__CREDENTIALS__AWS_1__AWS_ACCESS_KEY_ID"] = os.environ[ - "AWS_ACCESS_KEY_ID" - ] - os.environ["FIDES__CREDENTIALS__AWS_1__AWS_SECRET_ACCESS_KEY"] = os.environ[ - "AWS_SECRET_ACCESS_KEY" - ] +class TestGenerate: + @pytest.mark.integration + def test_generate_dataset_db_with_connection_string( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("dataset.yml") + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "dataset", + "db", + f"{tmp_file}", + "--connection-string", + "postgresql+psycopg2://postgres:fides@fides-db:5432/fides_test", + ], + ) + print(result.output) + assert result.exit_code == 0 - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "aws", - "--coverage-threshold", - "0", - "--credentials-id", - "aws_1", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.integration + def test_generate_dataset_db_with_credentials_id( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("dataset.yml") + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "dataset", + "db", + f"{tmp_file}", + "--credentials-id", + "postgres_1", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_aws_input_credential_options( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("system.yml") + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "system", + "aws", + f"{tmp_file}", + "--access_key_id", + os.environ["AWS_ACCESS_KEY_ID"], + "--secret_access_key", + os.environ["AWS_SECRET_ACCESS_KEY"], + "--region", + os.environ["AWS_DEFAULT_REGION"], + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_generate_system_okta_input_credential_options( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("system.yml") - token = os.environ["OKTA_CLIENT_TOKEN"] - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "system", - "okta", - f"{tmp_file}", - "--org-url", - OKTA_URL, - "--token", - token, - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_aws_environment_credentials( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("system.yml") + result = test_cli_runner.invoke( + cli, + ["-f", test_config_path, "generate", "system", "aws", f"{tmp_file}"], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_aws_input_credentials_id( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + os.environ["FIDES__CREDENTIALS__AWS_1__AWS_ACCESS_KEY_ID"] = os.environ[ + "AWS_ACCESS_KEY_ID" + ] + os.environ["FIDES__CREDENTIALS__AWS_1__AWS_SECRET_ACCESS_KEY"] = os.environ[ + "AWS_SECRET_ACCESS_KEY" + ] + tmp_file = tmpdir.join("system.yml") + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "system", + "aws", + f"{tmp_file}", + "--credentials-id", + "aws_1", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_scan_system_okta_input_credential_options( - test_config_path: str, test_cli_runner: CliRunner -) -> None: - token = os.environ["OKTA_CLIENT_TOKEN"] - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "okta", - "--org-url", - OKTA_URL, - "--token", - token, - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_okta_input_credential_options( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("system.yml") + token = os.environ["OKTA_CLIENT_TOKEN"] + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "system", + "okta", + f"{tmp_file}", + "--org-url", + OKTA_URL, + "--token", + token, + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_okta_environment_credentials( + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("system.yml") + os.environ["OKTA_CLIENT_ORGURL"] = OKTA_URL + result = test_cli_runner.invoke( + cli, + ["-f", test_config_path, "generate", "system", "okta", f"{tmp_file}"], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_generate_system_okta_environment_credentials( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("system.yml") - os.environ["OKTA_CLIENT_ORGURL"] = OKTA_URL - result = test_cli_runner.invoke( - cli, - ["-f", test_config_path, "generate", "system", "okta", f"{tmp_file}"], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.external + def test_generate_system_okta_input_credentials_id( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: + tmp_file = tmpdir.join("system.yml") + os.environ["FIDES__CREDENTIALS__OKTA_1__TOKEN"] = os.environ[ + "OKTA_CLIENT_TOKEN" + ] + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "system", + "okta", + f"{tmp_file}", + "--credentials-id", + "okta_1", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_generate_dataset_bigquery_credentials_id( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: -@pytest.mark.external -def test_scan_system_okta_environment_credentials( - test_config_path: str, - test_cli_runner: CliRunner, -) -> None: - os.environ["OKTA_CLIENT_ORGURL"] = OKTA_URL - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "okta", - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 + tmp_output_file = tmpdir.join("dataset.yml") + config_data = os.getenv("BIGQUERY_CONFIG", "e30=") + config_data_decoded = loads( + b64decode(config_data.encode("utf-8")).decode("utf-8") + ) + os.environ["FIDES__CREDENTIALS__BIGQUERY_1__PROJECT_ID"] = config_data_decoded[ + "project_id" + ] + os.environ[ + "FIDES__CREDENTIALS__BIGQUERY_1__PRIVATE_KEY_ID" + ] = config_data_decoded["private_key_id"] + os.environ["FIDES__CREDENTIALS__BIGQUERY_1__PRIVATE_KEY"] = config_data_decoded[ + "private_key" + ] + os.environ[ + "FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_EMAIL" + ] = config_data_decoded["client_email"] + os.environ["FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_ID"] = config_data_decoded[ + "client_id" + ] + os.environ[ + "FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_X509_CERT_URL" + ] = config_data_decoded["client_x509_cert_url"] + dataset_name = "fidesopstest" + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "dataset", + "gcp", + "bigquery", + dataset_name, + f"{tmp_output_file}", + "--credentials-id", + "bigquery_1", + ], + ) + print(result.output) + assert result.exit_code == 0 + @pytest.mark.external + def test_generate_dataset_bigquery_keyfile_path( + self, + test_config_path: str, + test_cli_runner: CliRunner, + tmpdir: LocalPath, + ) -> None: -@pytest.mark.external -def test_generate_system_okta_input_credentials_id( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: - tmp_file = tmpdir.join("system.yml") - os.environ["FIDES__CREDENTIALS__OKTA_1__TOKEN"] = os.environ["OKTA_CLIENT_TOKEN"] - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "system", - "okta", - f"{tmp_file}", - "--credentials-id", - "okta_1", - ], - ) - print(result.output) - assert result.exit_code == 0 + tmp_output_file = tmpdir.join("dataset.yml") + tmp_keyfile = tmpdir.join("bigquery.json") + config_data = os.getenv("BIGQUERY_CONFIG", "e30=") + config_data_decoded = loads( + b64decode(config_data.encode("utf-8")).decode("utf-8") + ) + with open(tmp_keyfile, "w", encoding="utf-8") as keyfile: + dump(config_data_decoded, keyfile) + dataset_name = "fidesopstest" + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "generate", + "dataset", + "gcp", + "bigquery", + dataset_name, + f"{tmp_output_file}", + "--keyfile-path", + f"{tmp_keyfile}", + ], + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_scan_system_okta_input_credentials_id( - test_config_path: str, - test_cli_runner: CliRunner, -) -> None: - os.environ["FIDES__CREDENTIALS__OKTA_1__TOKEN"] = os.environ["OKTA_CLIENT_TOKEN"] - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "scan", - "system", - "okta", - "--credentials-id", - "okta_1", - "--coverage-threshold", - "0", - ], - ) - print(result.output) - assert result.exit_code == 0 +@pytest.fixture(scope="class") +def credentials_path(tmp_path_factory) -> str: + credentials_dir = tmp_path_factory.mktemp("credentials") + credentials_path = credentials_dir / ".fides_credentials" + return str(credentials_path) -@pytest.mark.external -def test_generate_dataset_bigquery_credentials_id( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: +class TestUser: + """ + Test the "user" command group. - tmp_output_file = tmpdir.join("dataset.yml") - config_data = os.getenv("BIGQUERY_CONFIG", "e30=") - config_data_decoded = loads(b64decode(config_data.encode("utf-8")).decode("utf-8")) - os.environ["FIDES__CREDENTIALS__BIGQUERY_1__PROJECT_ID"] = config_data_decoded[ - "project_id" - ] - os.environ["FIDES__CREDENTIALS__BIGQUERY_1__PRIVATE_KEY_ID"] = config_data_decoded[ - "private_key_id" - ] - os.environ["FIDES__CREDENTIALS__BIGQUERY_1__PRIVATE_KEY"] = config_data_decoded[ - "private_key" - ] - os.environ["FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_EMAIL"] = config_data_decoded[ - "client_email" - ] - os.environ["FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_ID"] = config_data_decoded[ - "client_id" - ] - os.environ[ - "FIDES__CREDENTIALS__BIGQUERY_1__CLIENT_X509_CERT_URL" - ] = config_data_decoded["client_x509_cert_url"] - dataset_name = "fidesopstest" - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "dataset", - "gcp", - "bigquery", - dataset_name, - f"{tmp_output_file}", - "--credentials-id", - "bigquery_1", - ], - ) - print(result.output) - assert result.exit_code == 0 + Most tests rely on previous tests. + """ + @pytest.mark.unit + def test_user_login_provide_credentials( + self, test_config_path: str, test_cli_runner: CliRunner, credentials_path: str + ) -> None: + """Test logging in as a user with a provided username and password.""" + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "user", + "login", + "-u", + "root_user", + "-p", + "Testpassword1!", + ], + env={"FIDES_CREDENTIALS_PATH": credentials_path}, + ) + print(result.output) + assert result.exit_code == 0 -@pytest.mark.external -def test_generate_dataset_bigquery_keyfile_path( - test_config_path: str, - test_cli_runner: CliRunner, - tmpdir: LocalPath, -) -> None: + @pytest.mark.unit + def test_user_create( + self, test_config_path: str, test_cli_runner: CliRunner, credentials_path: str + ) -> None: + """Test creating a user with the current credentials.""" + result = test_cli_runner.invoke( + cli, + [ + "-f", + test_config_path, + "user", + "create", + "-u", + "newuser", + "-p", + "Newpassword1!", + ], + env={"FIDES_CREDENTIALS_PATH": credentials_path}, + ) + print(result.output) + assert result.exit_code == 0 - tmp_output_file = tmpdir.join("dataset.yml") - tmp_keyfile = tmpdir.join("bigquery.json") - config_data = os.getenv("BIGQUERY_CONFIG", "e30=") - config_data_decoded = loads(b64decode(config_data.encode("utf-8")).decode("utf-8")) - with open(tmp_keyfile, "w", encoding="utf-8") as keyfile: - dump(config_data_decoded, keyfile) - dataset_name = "fidesopstest" - result = test_cli_runner.invoke( - cli, - [ - "-f", - test_config_path, - "generate", - "dataset", - "gcp", - "bigquery", - dataset_name, - f"{tmp_output_file}", - "--keyfile-path", - f"{tmp_keyfile}", - ], - ) - print(result.output) - assert result.exit_code == 0 + @pytest.mark.unit + def test_user_permissions( + self, test_config_path: str, test_cli_runner: CliRunner, credentials_path: str + ) -> None: + """Test getting user permissions for the current user.""" + result = test_cli_runner.invoke( + cli, + ["-f", test_config_path, "user", "permissions"], + env={"FIDES_CREDENTIALS_PATH": credentials_path}, + ) + print(result.output) + assert result.exit_code == 0 diff --git a/tests/ctl/core/test_user.py b/tests/ctl/core/test_user.py new file mode 100644 index 0000000000..968cab82ee --- /dev/null +++ b/tests/ctl/core/test_user.py @@ -0,0 +1,32 @@ +from os import environ + +import pytest + +from fides.core.user import Credentials, get_credentials_path + + +@pytest.mark.unit +class TestCredentials: + """ + Test the Credentials object. + """ + + def test_valid_credentials(self): + credentials = Credentials( + username="test", + password="password", + user_id="some_id", + access_token="some_token", + ) + assert credentials.username == "test" + assert credentials.password == "password" + assert credentials.user_id == "some_id" + assert credentials.access_token == "some_token" + + +def test_get_credentials_path() -> None: + """Test that a custom path for the credentials file works as expected.""" + expected_path = "test_credentials" + environ["FIDES_CREDENTIALS_PATH"] = "test_credentials" + actual_path = get_credentials_path() + assert expected_path == actual_path diff --git a/tests/ctl/test_config.toml b/tests/ctl/test_config.toml index 8525aad788..65c74eca28 100644 --- a/tests/ctl/test_config.toml +++ b/tests/ctl/test_config.toml @@ -44,6 +44,8 @@ client_x509_cert_url = "redacted_url_override_in_tests" [security] app_encryption_key = "OLMkv91j8DHiDAULnK5Lxx3kSCov30b3" +root_username = "root_user" +root_password = "Testpassword1!" oauth_root_client_id = "fidesadmin" oauth_root_client_secret = "fidesadminsecret" drp_jwt_secret = "secret"