Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@
("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true"
)

# permissions cleanup
ARG_DRY_RUN = Arg(
("--dry-run",), help="Show what would be cleaned up without making any changes.", action="store_true"
)
ARG_DAG_ID_OPTIONAL = Arg(
("-d", "--dag-id"), help="Optional: Clean up permissions for specific DAG ID only", type=str
)

################
# # COMMANDS # #
################
Expand Down Expand Up @@ -253,6 +261,30 @@
args=(ARG_INCLUDE_DAGS, ARG_VERBOSE),
)

PERMISSIONS_CLEANUP_COMMAND = ActionCommand(
name="permissions-cleanup",
help="Clean up DAG permissions in Flask-AppBuilder tables",
description=(
"Clean up DAG-specific permissions. By default, cleans up orphaned permissions "
"for deleted DAGs. Use --dag-id to clean up permissions for a specific DAG."
),
func=lazy_load_command(
"airflow.providers.fab.auth_manager.cli_commands.permissions_command.permissions_cleanup"
),
args=(ARG_DAG_ID_OPTIONAL, ARG_DRY_RUN, ARG_YES, ARG_VERBOSE),
epilog=(
"examples:\n"
"To see what orphaned permissions would be cleaned up:\n"
" $ airflow fab-auth-manager permissions-cleanup --dry-run\n"
"To clean up all orphaned permissions:\n"
" $ airflow fab-auth-manager permissions-cleanup\n"
"To clean up permissions for specific DAG:\n"
" $ airflow fab-auth-manager permissions-cleanup --dag-id my_dag\n"
"To clean up without confirmation:\n"
" $ airflow fab-auth-manager permissions-cleanup --yes"
),
)

DB_COMMANDS = (
ActionCommand(
name="migrate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Permissions cleanup command."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.strings import to_boolean

if TYPE_CHECKING:
from sqlalchemy.orm import Session

log = logging.getLogger(__name__)


@provide_session
def cleanup_dag_permissions(dag_id: str, session: Session = NEW_SESSION) -> None:
"""
Clean up DAG-specific permissions from Flask-AppBuilder tables.

When a DAG is deleted, we need to clean up the corresponding permissions
to prevent orphaned entries in the ab_view_menu table.

This addresses issue #50905: Deleted DAGs not removed from ab_view_menu table
and show up in permissions.

:param dag_id: Specific DAG ID to clean up.
:param session: Database session.
"""
from sqlalchemy import delete, select

from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role
from airflow.security.permissions import RESOURCE_DAG_PREFIX, RESOURCE_DAG_RUN, RESOURCE_DETAILS_MAP

# Clean up specific DAG permissions
dag_resources = session.scalars(
select(Resource).filter(
Resource.name.in_(
[
f"{RESOURCE_DAG_PREFIX}{dag_id}", # DAG:dag_id
f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}{dag_id}", # DAG_RUN:dag_id
]
)
)
).all()
log.info("Cleaning up DAG-specific permissions for dag_id: %s", dag_id)

if not dag_resources:
return

dag_resource_ids = [resource.id for resource in dag_resources]

# Find all permissions associated with these resources
dag_permissions = session.scalars(
select(Permission).filter(Permission.resource_id.in_(dag_resource_ids))
).all()

if not dag_permissions:
# Delete resources even if no permissions exist
session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids)))
return

dag_permission_ids = [permission.id for permission in dag_permissions]

# Delete permission-role associations first (foreign key constraint)
session.execute(
delete(assoc_permission_role).where(
assoc_permission_role.c.permission_view_id.in_(dag_permission_ids)
)
)

# Delete permissions
session.execute(delete(Permission).where(Permission.resource_id.in_(dag_resource_ids)))

# Delete resources (ab_view_menu entries)
session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids)))

log.info("Cleaned up %d DAG-specific permissions", len(dag_permissions))


@cli_utils.action_cli
@providers_configuration_loaded
def permissions_cleanup(args):
"""Clean up DAG permissions in Flask-AppBuilder tables."""
from sqlalchemy import select

from airflow.models import DagModel
from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder
from airflow.providers.fab.auth_manager.models import Resource
from airflow.security.permissions import (
RESOURCE_DAG_PREFIX,
RESOURCE_DAG_RUN,
RESOURCE_DETAILS_MAP,
)
from airflow.utils.session import create_session

with get_application_builder() as _:
with create_session() as session:
# Get all existing DAG IDs from DagModel
existing_dag_ids = {dag.dag_id for dag in session.scalars(select(DagModel)).all()}

# Get all DAG-related resources from FAB tables
dag_resources = session.scalars(
select(Resource).filter(
Resource.name.like(f"{RESOURCE_DAG_PREFIX}%")
| Resource.name.like(f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}%")
)
).all()

orphaned_resources = []
orphaned_dag_ids = set()

for resource in dag_resources:
# Extract DAG ID from resource name
dag_id = None
if resource.name.startswith(RESOURCE_DAG_PREFIX):
dag_id = resource.name[len(RESOURCE_DAG_PREFIX) :]
elif resource.name.startswith(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]):
dag_id = resource.name[len(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]) :]

# Check if this DAG ID still exists
if dag_id and dag_id not in existing_dag_ids:
orphaned_resources.append(resource)
orphaned_dag_ids.add(dag_id)

# Filter by specific DAG ID if provided
if args.dag_id:
if args.dag_id in orphaned_dag_ids:
orphaned_dag_ids = {args.dag_id}
print(f"Filtering to clean up permissions for DAG: {args.dag_id}")
else:
print(
f"DAG '{args.dag_id}' not found in orphaned permissions or still exists in database."
)
return

if not orphaned_dag_ids:
if args.dag_id:
print(f"No orphaned permissions found for DAG: {args.dag_id}")
else:
print("No orphaned DAG permissions found.")
return

print(f"Found orphaned permissions for {len(orphaned_dag_ids)} deleted DAG(s):")
for dag_id in sorted(orphaned_dag_ids):
print(f" - {dag_id}")

if args.dry_run:
print("\nDry run mode: No changes will be made.")
print(f"Would clean up permissions for {len(orphaned_dag_ids)} orphaned DAG(s).")
return

# Perform cleanup if not in dry run mode
if not args.yes:
action = (
f"clean up permissions for {len(orphaned_dag_ids)} DAG(s)"
if not args.dag_id
else f"clean up permissions for DAG '{args.dag_id}'"
)
confirm = input(f"\nDo you want to {action}? [y/N]: ")
if not to_boolean(confirm):
print("Cleanup cancelled.")
return

# Perform the actual cleanup
cleanup_count = 0
for dag_id in orphaned_dag_ids:
try:
cleanup_dag_permissions(dag_id, session)
cleanup_count += 1
if args.verbose:
print(f"Cleaned up permissions for DAG: {dag_id}")
except Exception as e:
print(f"Failed to clean up permissions for DAG {dag_id}: {e}")

print(f"\nSuccessfully cleaned up permissions for {cleanup_count} DAG(s).")
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from airflow.models import DagModel
from airflow.providers.fab.auth_manager.cli_commands.definition import (
DB_COMMANDS,
PERMISSIONS_CLEANUP_COMMAND,
ROLES_COMMANDS,
SYNC_PERM_COMMAND,
USERS_COMMANDS,
Expand Down Expand Up @@ -211,6 +212,7 @@ def get_cli_commands() -> list[CLICommand]:
subcommands=ROLES_COMMANDS,
),
SYNC_PERM_COMMAND, # not in a command group
PERMISSIONS_CLEANUP_COMMAND, # single command for permissions cleanup
]
# If Airflow version is 3.0.0 or higher, add the fab-db command group
if packaging.version.parse(
Expand Down
Loading