Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API test for managing secrets privilege #9121

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions smoke-test/tests/privileges/test_manage_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import pytest
import tenacity

from tests.utils import (get_frontend_session, wait_for_writes_to_sync, wait_for_healthcheck_util,
get_frontend_url, get_admin_credentials,get_sleep_info)
from tests.privileges.utils import (base_privileges_set_status, sensitive_info_prifileges_set_status,
view_entity_prifileges_set_status, create_user, remove_user,login_as)

sleep_sec, sleep_times = get_sleep_info()

@pytest.fixture(scope="session")
def wait_for_healthchecks():
wait_for_healthcheck_util()
yield


@pytest.mark.dependency()
def test_healthchecks(wait_for_healthchecks):
# Call to wait_for_healthchecks fixture will do the actual functionality.
pass


@pytest.fixture(scope="session")
def admin_session(wait_for_healthchecks):
yield get_frontend_session()


@pytest.mark.dependency(depends=["test_healthchecks"])
@pytest.fixture(scope="module", autouse=True)
def privileges_and_test_user_setup(admin_session):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely re-use this across tests.

Ideally we could just run the outside logic ONCE across multiple tests. Is there a way to do that?

"""Fixture to execute setup before and tear down after all tests are run"""
# Disable 'All users' privileges
base_privileges_set_status("INACTIVE", admin_session)
sensitive_info_prifileges_set_status("INACTIVE", admin_session)
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
view_entity_prifileges_set_status("INACTIVE", admin_session)
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
# Sleep for eventual consistency
wait_for_writes_to_sync()

# Create a new user
admin_session = create_user(admin_session, "user", "user")

yield

# Remove test user
remove_user(admin_session, "urn:li:corpuser:user")

# Restore All users privileges
base_privileges_set_status("ACTIVE", admin_session)
sensitive_info_prifileges_set_status("ACTIVE", admin_session)
view_entity_prifileges_set_status("ACTIVE", admin_session)

# Sleep for eventual consistency
wait_for_writes_to_sync()


@tenacity.retry(
stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_can_create_secret(session, json):
create_secret_success = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=json)
create_secret_success.raise_for_status()
secret_data = create_secret_success.json()

assert secret_data
assert secret_data["data"]
assert secret_data["data"]["createSecret"]
assert secret_data["data"]["createSecret"] == "urn:li:dataHubSecret:TestSecretName"


@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_cant_create_secret(session, json):
create_secret_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=json)
create_secret_response.raise_for_status()
create_secret_data = create_secret_response.json()

assert create_secret_data["errors"][0]["extensions"]["code"] == 403
assert create_secret_data["errors"][0]["extensions"]["type"] == "UNAUTHORIZED"
assert create_secret_data["data"]["createSecret"] == None


@pytest.mark.dependency(depends=["test_healthchecks"])
def test_add_and_verify_privileges_to_manage_secrets():

(admin_user, admin_pass) = get_admin_credentials()
admin_session = login_as(admin_user, admin_pass)
user_session = login_as("user", "user")

# Verify new user can't create secrets
create_secret = {
"query": """mutation createSecret($input: CreateSecretInput!) {\n
createSecret(input: $input)\n}""",
"variables": {
"input":{
"name":"TestSecretName",
"value":"Test Secret Value",
"description":"Test Secret Description"
}
},
}
_ensure_cant_create_secret(user_session, create_secret)


# Assign privileges to the new user to manage secrets
manage_secrets = {
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
"query": """mutation createPolicy($input: PolicyUpdateInput!) {\n
createPolicy(input: $input) }""",
"variables": {
"input": {
"type": "PLATFORM",
"name": "Manage Secrets",
"description": "Manage Secrets Policy",
"state": "ACTIVE",
"resources": {"filter":{"criteria":[]}},
"privileges": ["MANAGE_SECRETS"],
"actors": {
"users": ["urn:li:corpuser:user"],
"resourceOwners": False,
"allUsers": False,
"allGroups": False,
},
}
},
}

response = admin_session.post(f"{get_frontend_url()}/api/v2/graphql", json=manage_secrets)
response.raise_for_status()
res_data = response.json()

assert res_data
assert res_data["data"]
assert res_data["data"]["createPolicy"]
policy_urn = res_data["data"]["createPolicy"]


# Verify new user can create and manage secrets
# Create a secret
_ensure_can_create_secret(user_session, create_secret)


# Remove a secret
remove_secret = {
"query": """mutation deleteSecret($urn: String!) {\n
deleteSecret(urn: $urn)\n}""",
"variables": {
"urn": "urn:li:dataHubSecret:TestSecretName"
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
},
}

remove_secret_response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=remove_secret)
remove_secret_response.raise_for_status()
secret_data = remove_secret_response.json()

assert secret_data
assert secret_data["data"]
assert secret_data["data"]["deleteSecret"]
assert secret_data["data"]["deleteSecret"] == "urn:li:dataHubSecret:TestSecretName"


# Remove the policy
remove_policy = {
"query": """mutation deletePolicy($urn: String!) {\n
deletePolicy(urn: $urn) }""",
"variables": {"urn": policy_urn},
}

response = admin_session.post(f"{get_frontend_url()}/api/v2/graphql", json=remove_policy)
response.raise_for_status()
res_data = response.json()

assert res_data
assert res_data["data"]
assert res_data["data"]["deletePolicy"]
assert res_data["data"]["deletePolicy"] == policy_urn


# Ensure user can't create secret after policy is removed
_ensure_cant_create_secret(user_session, create_secret)
171 changes: 171 additions & 0 deletions smoke-test/tests/privileges/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import requests_wrapper as requests
from tests.consistency_utils import wait_for_writes_to_sync
from tests.utils import (get_frontend_url, wait_for_writes_to_sync, get_admin_credentials)


def base_privileges_set_status(status, session):
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
base_platform_privileges = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:7",
"input": {
"type": "PLATFORM",
"state": status,
"name": "All Users - Base Platform Privileges",
"description": "Grants base platform privileges to ALL users of DataHub. Change this policy to alter that behavior.",
"privileges": ["MANAGE_INGESTION",
"MANAGE_SECRETS",
"MANAGE_USERS_AND_GROUPS",
"VIEW_ANALYTICS",
"GENERATE_PERSONAL_ACCESS_TOKENS",
"MANAGE_DOMAINS",
"MANAGE_GLOBAL_ANNOUNCEMENTS",
"MANAGE_TESTS",
"MANAGE_GLOSSARIES",
"MANAGE_TAGS",
"MANAGE_GLOBAL_VIEWS",
"MANAGE_GLOBAL_OWNERSHIP_TYPES"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
base_privileges_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=base_platform_privileges)
base_privileges_response.raise_for_status()
base_res_data = base_privileges_response.json()
assert base_res_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:7"

def sensitive_info_prifileges_set_status(status, session):
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
dataset_sensitive_information = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:view-dataset-sensitive",
"input": {
"type": "METADATA",
"state": status,
"name": "All Users - View Dataset Sensitive Information",
"description": "Grants viewing privileges of usage and profile information of all datasets for all users",
"privileges": ["VIEW_DATASET_USAGE","VIEW_DATASET_PROFILE"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
sensitive_info_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=dataset_sensitive_information)
sensitive_info_response.raise_for_status()
sens_info_data = sensitive_info_response.json()
assert sens_info_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:view-dataset-sensitive"

def view_entity_prifileges_set_status(status, session):
kkorchak marked this conversation as resolved.
Show resolved Hide resolved
view_entity_page = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:view-entity-page-all",
"input": {
"type": "METADATA",
"state": status,
"name": "All Users - View Entity Page",
"description": "Grants entity view to all users",
"privileges": ["VIEW_ENTITY_PAGE",
"SEARCH_PRIVILEGE",
"GET_COUNTS_PRIVILEGE",
"GET_TIMESERIES_ASPECT_PRIVILEGE",
"GET_ENTITY_PRIVILEGE",
"GET_TIMELINE_PRIVILEGE"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
view_entity_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=view_entity_page)
view_entity_response.raise_for_status()
view_entity_data = view_entity_response.json()
assert view_entity_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:view-entity-page-all"

def create_user(session, email, password):
# Remove user if exists
res_data = remove_user(session, f"urn:li:corpuser:{email}")
assert res_data
assert "error" not in res_data
# Get the invite token
get_invite_token_json = {
"query": """query getInviteToken($input: GetInviteTokenInput!) {\n
getInviteToken(input: $input){\n
inviteToken\n
}\n
}""",
"variables": {"input": {}},
}
get_invite_token_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
invite_token = get_invite_token_res_data["data"]["getInviteToken"]["inviteToken"]
assert invite_token is not None
assert "error" not in invite_token
# Create a new user using the invite token
sign_up_json = {
"fullName": "Test User",
"email": email,
"password": password,
"title": "Data Engineer",
"inviteToken": invite_token,
}
sign_up_response = session.post(
f"{get_frontend_url()}/signUp", json=sign_up_json
)
sign_up_response.raise_for_status()
assert sign_up_response
assert "error" not in sign_up_response
wait_for_writes_to_sync()
session.cookies.clear()
(admin_user, admin_pass) = get_admin_credentials()
admin_session = login_as(admin_user, admin_pass)
return admin_session


def login_as(username, password):
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data)
response.raise_for_status()
return session

def remove_user(session, urn):
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn)
}""",
"variables": {"urn": urn},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
Loading