Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix permission issue for dag that has dot in name #23510

Merged
merged 15 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ def _sync_perm_for_dag(self, dag, session: Session = None):
from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag
from airflow.www.fab_security.sqla.models import Action, Permission, Resource

root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id

def needs_perms(dag_id: str) -> bool:
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_ACTIONS:
Expand All @@ -654,9 +656,9 @@ def needs_perms(dag_id: str) -> bool:
return True
return False

if dag.access_control or needs_perms(dag.dag_id):
self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id)
if dag.access_control or needs_perms(root_dag_id):
self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
19 changes: 10 additions & 9 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@
DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}


def resource_name_for_dag(dag_id):
"""Returns the resource name for a DAG id."""
if dag_id == RESOURCE_DAG:
return dag_id
def resource_name_for_dag(root_dag_id: str) -> str:
"""Returns the resource name for a DAG id.

if dag_id.startswith(RESOURCE_DAG_PREFIX):
return dag_id

# To account for SubDags
root_dag_id = dag_id.split(".")[0]
Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
"""
if root_dag_id == RESOURCE_DAG:
return root_dag_id
if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
return root_dag_id
return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
29 changes: 23 additions & 6 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ def __init__(self, appbuilder):
view.datamodel = CustomSQLAInterface(view.datamodel.obj)
self.perms = None

def _get_root_dag_id(self, dag_id):
if '.' in dag_id:
dm = (
self.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
.filter(DagModel.dag_id == dag_id)
.first()
)
return dm.root_dag_id or dm.dag_id
return dag_id

def init_role(self, role_name, perms):
"""
Initialize the role with actions and related resources.
Expand Down Expand Up @@ -340,7 +350,8 @@ def get_accessible_dag_ids(self, user, user_actions=None, session=None) -> Set[s
def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
"""Checks if user has read or write access to some dags."""
if dag_id and dag_id != '~':
return self.has_access(action, permissions.resource_name_for_dag(dag_id))
root_dag_id = self._get_root_dag_id(dag_id)
return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))

user = g.user
if action == permissions.ACTION_CAN_READ:
Expand All @@ -349,17 +360,20 @@ def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> boo

def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)

def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)

def can_delete_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG delete access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user)

def prefixed_dag_id(self, dag_id):
Expand All @@ -370,7 +384,8 @@ def prefixed_dag_id(self, dag_id):
DeprecationWarning,
stacklevel=2,
)
return permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
return permissions.resource_name_for_dag(root_dag_id)

def is_dag_resource(self, resource_name):
"""Determines if a resource belongs to a DAG or all DAGs."""
Expand Down Expand Up @@ -530,7 +545,8 @@ def create_dag_specific_permissions(self) -> None:
dags = dagbag.dags.values()

for dag in dags:
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
for action_name in self.DAG_ACTIONS:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)
Expand Down Expand Up @@ -615,6 +631,7 @@ def _sync_dag_view_permissions(self, dag_id, access_control):
:param access_control: a dict where each key is a rolename and
each value is a set() of action names (e.g. {'can_read'})
"""

dag_resource_name = permissions.resource_name_for_dag(dag_id)

def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]:
Expand Down
66 changes: 57 additions & 9 deletions tests/www/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def sample_dags(security_manager):
@pytest.fixture(scope="module")
def has_dag_perm(security_manager):
def _has_dag_perm(perm, dag_id, user):
return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
root_dag_id = security_manager._get_root_dag_id(dag_id)
return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user)

return _has_dag_perm

Expand Down Expand Up @@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
user.roles = security_manager.get_user_roles(user)
assert user.roles == {security_manager.get_public_role()}

test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"]

for dag_id in test_dag_ids:
with _create_dag_model_context(dag_id, session, security_manager):
Expand Down Expand Up @@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager):
for action in invalid_actions:
with pytest.raises(AirflowException) as ctx:
security_manager._sync_dag_view_permissions(
'access_control_test', access_control={rolename: {action}}
'access_control_test',
access_control={rolename: {action}},
)
assert "invalid permissions" in str(ctx.value)

Expand Down Expand Up @@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch,
assert ('can_edit', dag_resource_name) in all_perms

security_manager._sync_dag_view_permissions.assert_called_once_with(
permissions.resource_name_for_dag('has_access_control'), access_control
permissions.resource_name_for_dag('has_access_control'),
access_control,
)

del dagbag_mock.dags["has_access_control"]
with assert_queries_count(1): # one query to get all perms; dagbag is mocked
with assert_queries_count(2): # two query to get all perms; dagbag is mocked
# The extra query happens at permission check
security_manager.create_dag_specific_permissions()


Expand Down Expand Up @@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager):
security_manager.prefixed_dag_id("hello")


def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
parent_dag_name = "parent_dag"
subdag_name = parent_dag_name + ".subdag"
subsubdag_name = parent_dag_name + ".subdag.subsubdag"
with app.app_context():
mock_roles = [
{
Expand All @@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=parent_dag_name)
dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name)
dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name)
session.add_all([dag1, dag2, dag3])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)
for dag in [dag1, dag2, dag3]:
security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)

assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
assert_user_has_dag_perms(
perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
)
session.query(DagModel).delete()


def test_permissions_work_for_dags_with_dot_in_dagname(
app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session
):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
dag_id = "dag_id_1"
dag_id_2 = "dag_id_1.with_dot"
with app.app_context():
mock_roles = [
{
'role': role_name,
'perms': [
(permissions.ACTION_CAN_READ, f"DAG:{dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"),
],
}
]
with create_user_scope(
app,
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=dag_id)
dag2 = DagModel(dag_id=dag_id_2)
session.add_all([dag1, dag2])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE})
security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE})
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user)
assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user)
session.query(DagModel).delete()


def test_fab_models_use_airflow_base_meta():
Expand Down