diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/definition.py b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/definition.py index 7f8f1e84e2798..eab5e5eedb449 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/definition.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/definition.py @@ -108,6 +108,14 @@ ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true" ) +# permissions cleanup +ARG_DRY_RUN = Arg( + ("--dry-run",), help="Show what would be cleaned up without making any changes.", action="store_true" +) +ARG_DAG_ID_OPTIONAL = Arg( + ("-d", "--dag-id"), help="Optional: Clean up permissions for specific DAG ID only", type=str +) + ################ # # COMMANDS # # ################ @@ -253,6 +261,30 @@ args=(ARG_INCLUDE_DAGS, ARG_VERBOSE), ) +PERMISSIONS_CLEANUP_COMMAND = ActionCommand( + name="permissions-cleanup", + help="Clean up DAG permissions in Flask-AppBuilder tables", + description=( + "Clean up DAG-specific permissions. By default, cleans up orphaned permissions " + "for deleted DAGs. Use --dag-id to clean up permissions for a specific DAG." + ), + func=lazy_load_command( + "airflow.providers.fab.auth_manager.cli_commands.permissions_command.permissions_cleanup" + ), + args=(ARG_DAG_ID_OPTIONAL, ARG_DRY_RUN, ARG_YES, ARG_VERBOSE), + epilog=( + "examples:\n" + "To see what orphaned permissions would be cleaned up:\n" + " $ airflow fab-auth-manager permissions-cleanup --dry-run\n" + "To clean up all orphaned permissions:\n" + " $ airflow fab-auth-manager permissions-cleanup\n" + "To clean up permissions for specific DAG:\n" + " $ airflow fab-auth-manager permissions-cleanup --dag-id my_dag\n" + "To clean up without confirmation:\n" + " $ airflow fab-auth-manager permissions-cleanup --yes" + ), +) + DB_COMMANDS = ( ActionCommand( name="migrate", diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py new file mode 100644 index 0000000000000..a535050750c64 --- /dev/null +++ b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Permissions cleanup command.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from airflow.utils import cli as cli_utils +from airflow.utils.providers_configuration_loader import providers_configuration_loaded +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.strings import to_boolean + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + +log = logging.getLogger(__name__) + + +@provide_session +def cleanup_dag_permissions(dag_id: str, session: Session = NEW_SESSION) -> None: + """ + Clean up DAG-specific permissions from Flask-AppBuilder tables. + + When a DAG is deleted, we need to clean up the corresponding permissions + to prevent orphaned entries in the ab_view_menu table. + + This addresses issue #50905: Deleted DAGs not removed from ab_view_menu table + and show up in permissions. + + :param dag_id: Specific DAG ID to clean up. + :param session: Database session. + """ + from sqlalchemy import delete, select + + from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role + from airflow.security.permissions import RESOURCE_DAG_PREFIX, RESOURCE_DAG_RUN, RESOURCE_DETAILS_MAP + + # Clean up specific DAG permissions + dag_resources = session.scalars( + select(Resource).filter( + Resource.name.in_( + [ + f"{RESOURCE_DAG_PREFIX}{dag_id}", # DAG:dag_id + f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}{dag_id}", # DAG_RUN:dag_id + ] + ) + ) + ).all() + log.info("Cleaning up DAG-specific permissions for dag_id: %s", dag_id) + + if not dag_resources: + return + + dag_resource_ids = [resource.id for resource in dag_resources] + + # Find all permissions associated with these resources + dag_permissions = session.scalars( + select(Permission).filter(Permission.resource_id.in_(dag_resource_ids)) + ).all() + + if not dag_permissions: + # Delete resources even if no permissions exist + session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids))) + return + + dag_permission_ids = [permission.id for permission in dag_permissions] + + # Delete permission-role associations first (foreign key constraint) + session.execute( + delete(assoc_permission_role).where( + assoc_permission_role.c.permission_view_id.in_(dag_permission_ids) + ) + ) + + # Delete permissions + session.execute(delete(Permission).where(Permission.resource_id.in_(dag_resource_ids))) + + # Delete resources (ab_view_menu entries) + session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids))) + + log.info("Cleaned up %d DAG-specific permissions", len(dag_permissions)) + + +@cli_utils.action_cli +@providers_configuration_loaded +def permissions_cleanup(args): + """Clean up DAG permissions in Flask-AppBuilder tables.""" + from sqlalchemy import select + + from airflow.models import DagModel + from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder + from airflow.providers.fab.auth_manager.models import Resource + from airflow.security.permissions import ( + RESOURCE_DAG_PREFIX, + RESOURCE_DAG_RUN, + RESOURCE_DETAILS_MAP, + ) + from airflow.utils.session import create_session + + with get_application_builder() as _: + with create_session() as session: + # Get all existing DAG IDs from DagModel + existing_dag_ids = {dag.dag_id for dag in session.scalars(select(DagModel)).all()} + + # Get all DAG-related resources from FAB tables + dag_resources = session.scalars( + select(Resource).filter( + Resource.name.like(f"{RESOURCE_DAG_PREFIX}%") + | Resource.name.like(f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}%") + ) + ).all() + + orphaned_resources = [] + orphaned_dag_ids = set() + + for resource in dag_resources: + # Extract DAG ID from resource name + dag_id = None + if resource.name.startswith(RESOURCE_DAG_PREFIX): + dag_id = resource.name[len(RESOURCE_DAG_PREFIX) :] + elif resource.name.startswith(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]): + dag_id = resource.name[len(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]) :] + + # Check if this DAG ID still exists + if dag_id and dag_id not in existing_dag_ids: + orphaned_resources.append(resource) + orphaned_dag_ids.add(dag_id) + + # Filter by specific DAG ID if provided + if args.dag_id: + if args.dag_id in orphaned_dag_ids: + orphaned_dag_ids = {args.dag_id} + print(f"Filtering to clean up permissions for DAG: {args.dag_id}") + else: + print( + f"DAG '{args.dag_id}' not found in orphaned permissions or still exists in database." + ) + return + + if not orphaned_dag_ids: + if args.dag_id: + print(f"No orphaned permissions found for DAG: {args.dag_id}") + else: + print("No orphaned DAG permissions found.") + return + + print(f"Found orphaned permissions for {len(orphaned_dag_ids)} deleted DAG(s):") + for dag_id in sorted(orphaned_dag_ids): + print(f" - {dag_id}") + + if args.dry_run: + print("\nDry run mode: No changes will be made.") + print(f"Would clean up permissions for {len(orphaned_dag_ids)} orphaned DAG(s).") + return + + # Perform cleanup if not in dry run mode + if not args.yes: + action = ( + f"clean up permissions for {len(orphaned_dag_ids)} DAG(s)" + if not args.dag_id + else f"clean up permissions for DAG '{args.dag_id}'" + ) + confirm = input(f"\nDo you want to {action}? [y/N]: ") + if not to_boolean(confirm): + print("Cleanup cancelled.") + return + + # Perform the actual cleanup + cleanup_count = 0 + for dag_id in orphaned_dag_ids: + try: + cleanup_dag_permissions(dag_id, session) + cleanup_count += 1 + if args.verbose: + print(f"Cleaned up permissions for DAG: {dag_id}") + except Exception as e: + print(f"Failed to clean up permissions for DAG {dag_id}: {e}") + + print(f"\nSuccessfully cleaned up permissions for {cleanup_count} DAG(s).") diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 1cd9590a508bf..a5436048e3630 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -60,6 +60,7 @@ from airflow.models import DagModel from airflow.providers.fab.auth_manager.cli_commands.definition import ( DB_COMMANDS, + PERMISSIONS_CLEANUP_COMMAND, ROLES_COMMANDS, SYNC_PERM_COMMAND, USERS_COMMANDS, @@ -211,6 +212,7 @@ def get_cli_commands() -> list[CLICommand]: subcommands=ROLES_COMMANDS, ), SYNC_PERM_COMMAND, # not in a command group + PERMISSIONS_CLEANUP_COMMAND, # single command for permissions cleanup ] # If Airflow version is 3.0.0 or higher, add the fab-db command group if packaging.version.parse( diff --git a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py new file mode 100644 index 0000000000000..cf4a0355647a6 --- /dev/null +++ b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_permissions_command.py @@ -0,0 +1,353 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Test permissions command.""" + +from __future__ import annotations + +import argparse +from contextlib import redirect_stdout +from importlib import reload +from io import StringIO +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.cli import cli_parser + +from tests_common.test_utils.compat import ignore_provider_compatibility_error +from tests_common.test_utils.config import conf_vars + +with ignore_provider_compatibility_error("2.9.0+", __file__): + from airflow.providers.fab.auth_manager.cli_commands import permissions_command + from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder + +pytestmark = pytest.mark.db_test + + +class TestPermissionsCommand: + """Test permissions cleanup CLI commands.""" + + @pytest.fixture(autouse=True) + def _set_attrs(self): + with conf_vars( + { + ( + "core", + "auth_manager", + ): "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager", + } + ): + # Reload the module to use FAB auth manager + reload(cli_parser) + # Clearing the cache before calling it + cli_parser.get_parser.cache_clear() + self.parser = cli_parser.get_parser() + with get_application_builder() as appbuilder: + self.appbuilder = appbuilder + yield + + @patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions") + @patch("airflow.providers.fab.auth_manager.models.Resource") + def test_permissions_cleanup_success(self, mock_resource, mock_cleanup_dag_permissions): + """Test successful cleanup of DAG permissions.""" + # Mock args + args = argparse.Namespace() + args.dag_id = None + args.dry_run = False + args.yes = True + args.verbose = True + + # Mock orphaned resources + mock_orphaned_resource = MagicMock() + mock_orphaned_resource.name = "DAG:orphaned_dag" + + with ( + patch("airflow.providers.fab.auth_manager.cli_commands.utils.get_application_builder"), + patch("airflow.utils.session.create_session") as mock_session_ctx, + patch("sqlalchemy.select"), + redirect_stdout(StringIO()), + ): + mock_session = MagicMock() + mock_session_ctx.return_value.__enter__.return_value = mock_session + + # Mock DagModel query - return existing DAGs + mock_dag_result = MagicMock() + mock_dag_result.all.return_value = [MagicMock(dag_id="existing_dag")] + + # Mock Resource query - return orphaned resources + mock_resource_result = MagicMock() + mock_resource_result.all.return_value = [mock_orphaned_resource] + + # Setup session.scalars to return different results for different queries + mock_session.scalars.side_effect = [mock_dag_result, mock_resource_result] + + permissions_command.permissions_cleanup(args) + + # Verify function calls - it should be called exactly once for the orphaned DAG + mock_cleanup_dag_permissions.assert_called_once_with("orphaned_dag", mock_session) + + @patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions") + @patch("airflow.providers.fab.auth_manager.models.Resource") + def test_permissions_cleanup_dry_run(self, mock_resource, mock_cleanup_dag_permissions): + """Test dry run mode for permissions cleanup.""" + # Mock args + args = argparse.Namespace() + args.dag_id = None + args.dry_run = True + args.verbose = True + + # Mock orphaned resources + mock_orphaned_resource = MagicMock() + mock_orphaned_resource.name = "DAG:orphaned_dag" + + with ( + patch("airflow.providers.fab.auth_manager.cli_commands.utils.get_application_builder"), + patch("airflow.utils.session.create_session") as mock_session_ctx, + patch("sqlalchemy.select"), + redirect_stdout(StringIO()) as stdout, + ): + mock_session = MagicMock() + mock_session_ctx.return_value.__enter__.return_value = mock_session + + # Mock DagModel query - return existing DAGs + mock_dag_result = MagicMock() + mock_dag_result.all.return_value = [MagicMock(dag_id="existing_dag")] + + # Mock Resource query - return orphaned resources + mock_resource_result = MagicMock() + mock_resource_result.all.return_value = [mock_orphaned_resource] + + # Setup session.scalars to return different results for different queries + mock_session.scalars.side_effect = [mock_dag_result, mock_resource_result] + + permissions_command.permissions_cleanup(args) + + output = stdout.getvalue() + assert "Dry run mode" in output or "No orphaned DAG permissions found" in output + # In dry run mode, cleanup_dag_permissions should NOT be called + mock_cleanup_dag_permissions.assert_not_called() + + @patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions") + @patch("airflow.providers.fab.auth_manager.models.Resource") + def test_permissions_cleanup_specific_dag(self, mock_resource, mock_cleanup_dag_permissions): + """Test cleanup for a specific DAG.""" + # Mock args + args = argparse.Namespace() + args.dag_id = "test_dag" + args.dry_run = False + args.yes = True + args.verbose = True + + # Mock orphaned resource for the specific DAG + mock_orphaned_resource = MagicMock() + mock_orphaned_resource.name = "DAG:test_dag" + + with ( + patch("airflow.providers.fab.auth_manager.cli_commands.utils.get_application_builder"), + patch("airflow.utils.session.create_session") as mock_session_ctx, + patch("sqlalchemy.select"), + redirect_stdout(StringIO()), + ): + mock_session = MagicMock() + mock_session_ctx.return_value.__enter__.return_value = mock_session + + # Mock DagModel query - return existing DAGs (NOT including the target DAG) + mock_dag_result = MagicMock() + mock_dag_result.all.return_value = [ + MagicMock(dag_id="existing_dag"), + MagicMock(dag_id="another_existing_dag"), + ] + + # Mock Resource query - return orphaned resources + mock_resource_result = MagicMock() + mock_resource_result.all.return_value = [mock_orphaned_resource] + + # Setup session.scalars to return different results for different queries + mock_session.scalars.side_effect = [mock_dag_result, mock_resource_result] + + permissions_command.permissions_cleanup(args) + + # Should call cleanup_dag_permissions specifically for test_dag + mock_cleanup_dag_permissions.assert_called_once_with("test_dag", mock_session) + + @patch("airflow.providers.fab.auth_manager.cli_commands.permissions_command.cleanup_dag_permissions") + @patch("airflow.providers.fab.auth_manager.models.Resource") + @patch("builtins.input", return_value="n") + def test_permissions_cleanup_no_confirmation( + self, mock_input, mock_resource, mock_cleanup_dag_permissions + ): + """Test cleanup cancellation when user doesn't confirm.""" + # Mock args + args = argparse.Namespace() + args.dag_id = None + args.dry_run = False + args.yes = False + args.verbose = False + + # Mock orphaned resources + mock_orphaned_resource = MagicMock() + mock_orphaned_resource.name = "DAG:orphaned_dag" + + with ( + patch("airflow.providers.fab.auth_manager.cli_commands.utils.get_application_builder"), + patch("airflow.utils.session.create_session") as mock_session_ctx, + patch("sqlalchemy.select"), + redirect_stdout(StringIO()) as stdout, + ): + mock_session = MagicMock() + mock_session_ctx.return_value.__enter__.return_value = mock_session + + # Mock DagModel query - return existing DAGs + mock_dag_result = MagicMock() + mock_dag_result.all.return_value = [MagicMock(dag_id="existing_dag")] + + # Mock Resource query - return orphaned resources + mock_resource_result = MagicMock() + mock_resource_result.all.return_value = [mock_orphaned_resource] + + # Setup session.scalars to return different results for different queries + mock_session.scalars.side_effect = [mock_dag_result, mock_resource_result] + + permissions_command.permissions_cleanup(args) + + output = stdout.getvalue() + # Should not call cleanup if user declines or no orphaned permissions found + assert "Cleanup cancelled" in output or "No orphaned DAG permissions found" in output + + # cleanup_dag_permissions should NOT be called when user cancels + if "Cleanup cancelled" in output: + mock_cleanup_dag_permissions.assert_not_called() + + +class TestDagPermissions: + """Test cases for cleanup_dag_permissions function with real database operations.""" + + @pytest.fixture(autouse=True) + def _setup_fab_test(self): + """Setup FAB for testing.""" + with conf_vars( + { + ( + "core", + "auth_manager", + ): "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager", + } + ): + with get_application_builder(): + yield + + def test_cleanup_dag_permissions_removes_specific_dag_resources(self): + """Test that cleanup_dag_permissions removes only the specified DAG resources.""" + from sqlalchemy import select + + from airflow.providers.fab.auth_manager.cli_commands.permissions_command import ( + cleanup_dag_permissions, + ) + from airflow.providers.fab.auth_manager.models import Action, Permission, Resource + from airflow.security.permissions import RESOURCE_DAG_PREFIX + from airflow.utils.session import create_session + + with create_session() as session: + # Create resources for two different DAGs + target_resource = Resource(name=f"{RESOURCE_DAG_PREFIX}target_dag") + keep_resource = Resource(name=f"{RESOURCE_DAG_PREFIX}keep_dag") + session.add_all([target_resource, keep_resource]) + session.flush() + + # Get or create action + read_action = session.scalars(select(Action).where(Action.name == "can_read")).first() + if not read_action: + read_action = Action(name="can_read") + session.add(read_action) + session.flush() + + # Create permissions + target_perm = Permission(action=read_action, resource=target_resource) + keep_perm = Permission(action=read_action, resource=keep_resource) + session.add_all([target_perm, keep_perm]) + session.commit() + + # Execute cleanup + cleanup_dag_permissions("target_dag", session) + + # Verify: target resource deleted, keep resource remains + assert not session.get(Resource, target_resource.id) + assert session.get(Resource, keep_resource.id) + assert not session.get(Permission, target_perm.id) + assert session.get(Permission, keep_perm.id) + + def test_cleanup_dag_permissions_handles_no_matching_resources(self): + """Test that cleanup_dag_permissions handles DAGs with no matching resources gracefully.""" + from sqlalchemy import func, select + + from airflow.providers.fab.auth_manager.cli_commands.permissions_command import ( + cleanup_dag_permissions, + ) + from airflow.providers.fab.auth_manager.models import Resource + from airflow.utils.session import create_session + + with create_session() as session: + initial_count = session.scalar(select(func.count(Resource.id))) + cleanup_dag_permissions("non_existent_dag", session) + assert session.scalar(select(func.count(Resource.id))) == initial_count + + def test_cleanup_dag_permissions_handles_resources_without_permissions(self): + """Test cleanup when resources exist but have no permissions.""" + from airflow.providers.fab.auth_manager.cli_commands.permissions_command import ( + cleanup_dag_permissions, + ) + from airflow.providers.fab.auth_manager.models import Resource + from airflow.security.permissions import RESOURCE_DAG_PREFIX + from airflow.utils.session import create_session + + with create_session() as session: + # Create resource without permissions + resource = Resource(name=f"{RESOURCE_DAG_PREFIX}test_dag") + session.add(resource) + session.commit() + resource_id = resource.id + + cleanup_dag_permissions("test_dag", session) + assert not session.get(Resource, resource_id) + + def test_cleanup_dag_permissions_with_default_session(self): + """Test cleanup_dag_permissions when no session is provided (uses default).""" + from sqlalchemy import func, select + + from airflow.providers.fab.auth_manager.cli_commands.permissions_command import ( + cleanup_dag_permissions, + ) + from airflow.providers.fab.auth_manager.models import Resource + from airflow.security.permissions import RESOURCE_DAG_PREFIX + from airflow.utils.session import create_session + + # Setup test data + with create_session() as session: + resource = Resource(name=f"{RESOURCE_DAG_PREFIX}test_dag") + session.add(resource) + session.commit() + + # Call cleanup without session parameter + cleanup_dag_permissions("test_dag") + + # Verify deletion + with create_session() as session: + count = session.scalar( + select(func.count(Resource.id)).where(Resource.name == f"{RESOURCE_DAG_PREFIX}test_dag") + ) + assert count == 0