Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove deprecated api /superset/override_role_permissions #24334

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,60 +226,6 @@ def datasources(self) -> FlaskResponse:
)
)

@has_access_api
@event_logger.log_this
@expose("/override_role_permissions/", methods=("POST",))
@deprecated()
def override_role_permissions(self) -> FlaskResponse:
"""Updates the role with the give datasource permissions.

Permissions not in the request will be revoked. This endpoint should
be available to admins only. Expects JSON in the format:
{
'role_name': '{role_name}',
'database': [{
'datasource_type': '{table|druid}',
'name': '{database_name}',
'schema': [{
'name': '{schema_name}',
'datasources': ['{datasource name}, {datasource name}']
}]
}]
}
"""
data = request.get_json(force=True)
role_name = data["role_name"]
databases = data["database"]

db_ds_names = set()
for dbs in databases:
for schema in dbs["schema"]:
for ds_name in schema["datasources"]:
fullname = utils.get_datasource_full_name(
dbs["name"], ds_name, schema=schema["name"]
)
db_ds_names.add(fullname)

existing_datasources = SqlaTable.get_all_datasources(db.session)
datasources = [d for d in existing_datasources if d.full_name in db_ds_names]
role = security_manager.find_role(role_name)
# remove all permissions
role.permissions = []
# grant permissions to the list of datasources
granted_perms = []
for datasource in datasources:
view_menu_perm = security_manager.find_permission_view_menu(
view_menu_name=datasource.perm, permission_name="datasource_access"
)
# prevent creating empty permissions
if view_menu_perm and view_menu_perm.view_menu:
role.permissions.append(view_menu_perm)
granted_perms.append(view_menu_perm.view_menu.name)
db.session.commit()
return self.json_response(
{"granted": granted_perms, "requested": list(db_ds_names)}, status=201
)

@has_access
@event_logger.log_this
@expose("/request_access/", methods=("POST",))
Expand Down
74 changes: 0 additions & 74 deletions tests/integration_tests/access_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
# isort:skip_file
"""Unit tests for Superset"""
import json
import unittest
from typing import Optional
from unittest import mock
Expand Down Expand Up @@ -144,79 +143,6 @@ def tearDown(self):
db.session.commit()
db.session.close()

def test_override_role_permissions_is_admin_only(self):
self.logout()
self.login("alpha")
response = self.client.post(
"/superset/override_role_permissions/",
data=json.dumps(ROLE_TABLES_PERM_DATA),
content_type="application/json",
follow_redirects=True,
)
self.assertNotEqual(405, response.status_code)

@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_override_role_permissions_1_table(self):
database = get_example_database()
with database.get_sqla_engine_with_context() as engine:
schema = inspect(engine).default_schema_name

perm_data = ROLE_TABLES_PERM_DATA.copy()
perm_data["database"][0]["schema"][0]["name"] = schema

response = self.client.post(
"/superset/override_role_permissions/",
data=json.dumps(perm_data),
content_type="application/json",
)
self.assertEqual(201, response.status_code)

updated_override_me = security_manager.find_role("override_me")
self.assertEqual(1, len(updated_override_me.permissions))
birth_names = self.get_table(name="birth_names")
self.assertEqual(
birth_names.perm, updated_override_me.permissions[0].view_menu.name
)
self.assertEqual(
"datasource_access", updated_override_me.permissions[0].permission.name
)

@pytest.mark.usefixtures(
"load_energy_table_with_slice", "load_birth_names_dashboard_with_slices"
)
def test_override_role_permissions_drops_absent_perms(self):
database = get_example_database()
with database.get_sqla_engine_with_context() as engine:
schema = inspect(engine).default_schema_name

override_me = security_manager.find_role("override_me")
override_me.permissions.append(
security_manager.find_permission_view_menu(
view_menu_name=self.get_table(name="energy_usage").perm,
permission_name="datasource_access",
)
)
db.session.flush()

perm_data = ROLE_TABLES_PERM_DATA.copy()
perm_data["database"][0]["schema"][0]["name"] = schema

response = self.client.post(
"/superset/override_role_permissions/",
data=json.dumps(perm_data),
content_type="application/json",
)
self.assertEqual(201, response.status_code)
updated_override_me = security_manager.find_role("override_me")
self.assertEqual(1, len(updated_override_me.permissions))
birth_names = self.get_table(name="birth_names")
self.assertEqual(
birth_names.perm, updated_override_me.permissions[0].view_menu.name
)
self.assertEqual(
"datasource_access", updated_override_me.permissions[0].permission.name
)

def test_clean_requests_after_role_extend(self):
session = db.session

Expand Down