-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: dataset update permission out of sync #25043
Changes from all commits
525a7d5
2591ce1
9073cf5
49ad621
96d0861
85a22b6
01ada73
12e7743
7984bb7
ed94a03
6de1ef4
e364f5f
3680835
48d085c
4943320
7827767
225be2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,12 +74,10 @@ | |
from superset.common.db_query_status import QueryStatus | ||
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric | ||
from superset.connectors.sqla.utils import ( | ||
find_cached_objects_in_session, | ||
get_columns_description, | ||
get_physical_table_metadata, | ||
get_virtual_table_metadata, | ||
) | ||
from superset.datasets.models import Dataset as NewDataset | ||
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression | ||
from superset.exceptions import ( | ||
ColumnNotFoundException, | ||
|
@@ -1430,46 +1428,20 @@ def quote_identifier(self) -> Callable[[str], str]: | |
|
||
@staticmethod | ||
def before_update( | ||
mapper: Mapper, # pylint: disable=unused-argument | ||
connection: Connection, # pylint: disable=unused-argument | ||
mapper: Mapper, | ||
connection: Connection, | ||
target: SqlaTable, | ||
) -> None: | ||
""" | ||
Check before update if the target table already exists. | ||
|
||
Note this listener is called when any fields are being updated and thus it is | ||
necessary to first check whether the reference table is being updated. | ||
|
||
Note this logic is temporary, given uniqueness is handled via the dataset DAO, | ||
but is necessary until both the legacy datasource editor and datasource/save | ||
endpoints are deprecated. | ||
Note this listener is called when any fields are being updated | ||
|
||
:param mapper: The table mapper | ||
:param connection: The DB-API connection | ||
:param target: The mapped instance being persisted | ||
:raises Exception: If the target table is not unique | ||
""" | ||
|
||
# pylint: disable=import-outside-toplevel | ||
from superset.daos.dataset import DatasetDAO | ||
from superset.datasets.commands.exceptions import get_dataset_exist_error_msg | ||
|
||
# Check whether the relevant attributes have changed. | ||
state = db.inspect(target) # pylint: disable=no-member | ||
|
||
for attr in ["database_id", "schema", "table_name"]: | ||
history = state.get_history(attr, True) | ||
if history.has_changes(): | ||
break | ||
else: | ||
return None | ||
|
||
if not DatasetDAO.validate_uniqueness( | ||
target.database_id, target.schema, target.table_name, target.id | ||
): | ||
raise Exception( # pylint: disable=broad-exception-raised | ||
get_dataset_exist_error_msg(target.full_name) | ||
) | ||
target.load_database() | ||
security_manager.dataset_before_update(mapper, connection, target) | ||
|
||
@staticmethod | ||
def update_column( # pylint: disable=unused-argument | ||
|
@@ -1487,34 +1459,17 @@ def update_column( # pylint: disable=unused-argument | |
# table is updated. This busts the cache key for all charts that use the table. | ||
session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id)) | ||
|
||
# TODO: This shadow writing is deprecated | ||
# if table itself has changed, shadow-writing will happen in `after_update` anyway | ||
if target.table not in session.dirty: | ||
dataset: NewDataset = ( | ||
session.query(NewDataset) | ||
.filter_by(uuid=target.table.uuid) | ||
.one_or_none() | ||
) | ||
# Update shadow dataset and columns | ||
# did we find the dataset? | ||
if not dataset: | ||
# if dataset is not found create a new copy | ||
target.table.write_shadow_dataset() | ||
return | ||
|
||
@staticmethod | ||
def after_insert( | ||
mapper: Mapper, | ||
connection: Connection, | ||
sqla_table: SqlaTable, | ||
target: SqlaTable, | ||
) -> None: | ||
""" | ||
Update dataset permissions after insert | ||
""" | ||
security_manager.dataset_after_insert(mapper, connection, sqla_table) | ||
|
||
# TODO: deprecated | ||
sqla_table.write_shadow_dataset() | ||
target.load_database() | ||
security_manager.dataset_after_insert(mapper, connection, target) | ||
|
||
@staticmethod | ||
def after_delete( | ||
|
@@ -1527,63 +1482,16 @@ def after_delete( | |
""" | ||
security_manager.dataset_after_delete(mapper, connection, sqla_table) | ||
|
||
@staticmethod | ||
def after_update( | ||
mapper: Mapper, | ||
connection: Connection, | ||
sqla_table: SqlaTable, | ||
) -> None: | ||
""" | ||
Update dataset permissions | ||
""" | ||
# set permissions | ||
security_manager.dataset_after_update(mapper, connection, sqla_table) | ||
|
||
# TODO: the shadow writing is deprecated | ||
inspector = inspect(sqla_table) | ||
session = inspector.session | ||
|
||
# double-check that ``UPDATE``s are actually pending (this method is called even | ||
# for instances that have no net changes to their column-based attributes) | ||
if not session.is_modified(sqla_table, include_collections=True): | ||
return | ||
|
||
# find the dataset from the known instance list first | ||
# (it could be either from a previous query or newly created) | ||
dataset = next( | ||
find_cached_objects_in_session( | ||
session, NewDataset, uuids=[sqla_table.uuid] | ||
), | ||
None, | ||
) | ||
# if not found, pull from database | ||
if not dataset: | ||
dataset = ( | ||
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none() | ||
) | ||
if not dataset: | ||
sqla_table.write_shadow_dataset() | ||
return | ||
|
||
def write_shadow_dataset( | ||
self: SqlaTable, | ||
) -> None: | ||
""" | ||
This method is deprecated | ||
""" | ||
session = inspect(self).session | ||
# most of the write_shadow_dataset functionality has been removed | ||
# but leaving this portion in | ||
# to remove later because it is adding a Database relationship to the session | ||
# and there is some functionality that depends on this | ||
def load_database(self: SqlaTable) -> None: | ||
# somehow the database attribute is not loaded on access | ||
if self.database_id and ( | ||
not self.database or self.database.id != self.database_id | ||
): | ||
session = inspect(self).session | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for ensuring we're using the same session. |
||
self.database = session.query(Database).filter_by(id=self.database_id).one() | ||
|
||
|
||
sa.event.listen(SqlaTable, "before_update", SqlaTable.before_update) | ||
sa.event.listen(SqlaTable, "after_update", SqlaTable.after_update) | ||
sa.event.listen(SqlaTable, "after_insert", SqlaTable.after_insert) | ||
sa.event.listen(SqlaTable, "after_delete", SqlaTable.after_delete) | ||
sa.event.listen(SqlMetric, "after_update", SqlaTable.update_column) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1319,7 +1319,7 @@ def dataset_after_delete( | |
mapper, connection, "datasource_access", dataset_vm_name | ||
) | ||
|
||
def dataset_after_update( | ||
def dataset_before_update( | ||
self, | ||
mapper: Mapper, | ||
connection: Connection, | ||
|
@@ -1339,14 +1339,20 @@ def dataset_after_update( | |
:param target: The changed dataset object | ||
:return: | ||
""" | ||
# pylint: disable=import-outside-toplevel | ||
from superset.connectors.sqla.models import SqlaTable | ||
|
||
# Check if watched fields have changed | ||
state = inspect(target) | ||
history_database = state.get_history("database_id", True) | ||
history_table_name = state.get_history("table_name", True) | ||
history_schema = state.get_history("schema", True) | ||
table = SqlaTable.__table__ | ||
current_dataset = connection.execute( | ||
table.select().where(table.c.id == target.id) | ||
).one() | ||
current_db_id = current_dataset.database_id | ||
current_schema = current_dataset.schema | ||
current_table_name = current_dataset.table_name | ||
|
||
# When database name changes | ||
if history_database.has_changes() and history_database.deleted: | ||
if current_db_id != target.database_id: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the dataset permission is a string that includes the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. forget it, those cases are covered on the next if statements |
||
new_dataset_vm_name = self.get_dataset_perm( | ||
target.id, target.table_name, target.database.database_name | ||
) | ||
|
@@ -1366,20 +1372,19 @@ def dataset_after_update( | |
) | ||
|
||
# When table name changes | ||
if history_table_name.has_changes() and history_table_name.deleted: | ||
old_dataset_name = history_table_name.deleted[0] | ||
if current_table_name != target.table_name: | ||
new_dataset_vm_name = self.get_dataset_perm( | ||
target.id, target.table_name, target.database.database_name | ||
) | ||
old_dataset_vm_name = self.get_dataset_perm( | ||
target.id, old_dataset_name, target.database.database_name | ||
target.id, current_table_name, target.database.database_name | ||
) | ||
self._update_dataset_perm( | ||
mapper, connection, old_dataset_vm_name, new_dataset_vm_name, target | ||
) | ||
|
||
# When schema changes | ||
if history_schema.has_changes() and history_schema.deleted: | ||
if current_schema != target.schema: | ||
new_dataset_schema_name = self.get_schema_perm( | ||
target.database.database_name, target.schema | ||
) | ||
|
@@ -1410,6 +1415,7 @@ def _update_dataset_schema_perm( | |
:param target: Dataset that was updated | ||
:return: | ||
""" | ||
logger.info("Updating schema perm, new: %s", new_schema_permission_name) | ||
from superset.connectors.sqla.models import ( # pylint: disable=import-outside-toplevel | ||
SqlaTable, | ||
) | ||
|
@@ -1463,6 +1469,11 @@ def _update_dataset_perm( # pylint: disable=too-many-arguments | |
:param target: | ||
:return: | ||
""" | ||
logger.info( | ||
"Updating dataset perm, old: %s, new: %s", | ||
old_permission_name, | ||
new_permission_name, | ||
) | ||
from superset.connectors.sqla.models import ( # pylint: disable=import-outside-toplevel | ||
SqlaTable, | ||
) | ||
|
@@ -1477,6 +1488,15 @@ def _update_dataset_perm( # pylint: disable=too-many-arguments | |
new_dataset_view_menu = self.find_view_menu(new_permission_name) | ||
if new_dataset_view_menu: | ||
return | ||
old_dataset_view_menu = self.find_view_menu(old_permission_name) | ||
if not old_dataset_view_menu: | ||
logger.warning( | ||
"Could not find previous dataset permission %s", old_permission_name | ||
) | ||
self._insert_pvm_on_sqla_event( | ||
mapper, connection, "datasource_access", new_permission_name | ||
) | ||
return | ||
# Update VM | ||
connection.execute( | ||
view_menu_table.update() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,14 +56,12 @@ | |
app as superset_app, | ||
appbuilder, | ||
conf, | ||
db, | ||
get_feature_flags, | ||
is_feature_enabled, | ||
security_manager, | ||
) | ||
from superset.commands.exceptions import CommandException, CommandInvalidError | ||
from superset.connectors.sqla import models | ||
from superset.datasets.commands.exceptions import get_dataset_exist_error_msg | ||
from superset.db_engine_specs import get_available_engine_specs | ||
from superset.db_engine_specs.gsheets import GSheetsEngineSpec | ||
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType | ||
|
@@ -286,34 +284,6 @@ def wraps(self: "BaseSupersetView", *args: Any, **kwargs: Any) -> FlaskResponse: | |
return functools.update_wrapper(wraps, f) | ||
|
||
|
||
def validate_sqlatable(table: models.SqlaTable) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is not used anywhere, thus removing |
||
"""Checks the table existence in the database.""" | ||
with db.session.no_autoflush: | ||
table_query = db.session.query(models.SqlaTable).filter( | ||
models.SqlaTable.table_name == table.table_name, | ||
models.SqlaTable.schema == table.schema, | ||
models.SqlaTable.database_id == table.database.id, | ||
) | ||
if db.session.query(table_query.exists()).scalar(): | ||
raise Exception( # pylint: disable=broad-exception-raised | ||
get_dataset_exist_error_msg(table.full_name) | ||
) | ||
|
||
# Fail before adding if the table can't be found | ||
try: | ||
table.get_sqla_table_object() | ||
except Exception as ex: | ||
logger.exception("Got an error in pre_add for %s", table.name) | ||
raise Exception( # pylint: disable=broad-exception-raised | ||
_( | ||
"Table [%{table}s] could not be found, " | ||
"please double check your " | ||
"database connection, schema, and " | ||
"table name, error: {}" | ||
).format(table.name, str(ex)) | ||
) from ex | ||
|
||
|
||
class BaseSupersetView(BaseView): | ||
@staticmethod | ||
def json_response(obj: Any, status: int = 200) -> FlaskResponse: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check has already been made in dataset update command and proper domain exception thrown there, thus removing this extra check here.
We also have unique constraints at db layer for (db, schema, table) tuple