Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 9 additions & 45 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

from abc import abstractmethod
from collections.abc import Container, Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar

from flask_appbuilder.menu import MenuItem
from sqlalchemy import select

from airflow.auth.managers.models.base_user import BaseUser
Expand All @@ -31,13 +29,13 @@
)
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.security.permissions import ACTION_CAN_ACCESS_MENU
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from fastapi import FastAPI
from flask import Blueprint
from flask_appbuilder.menu import MenuItem
from sqlalchemy.orm import Session

from airflow.auth.managers.models.batch_apis import (
Expand All @@ -56,7 +54,6 @@
VariableDetails,
)
from airflow.cli.cli_config import CLICommand
from airflow.www.security_manager import AirflowSecurityManagerV2

ResourceMethod = Literal["GET", "POST", "PUT", "DELETE", "MENU"]

Expand Down Expand Up @@ -263,6 +260,14 @@ def is_authorized_custom_view(
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""

@abstractmethod
def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
"""
Filter menu items based on user permissions.

:param menu_items: list of all menu items
"""

def batch_is_authorized_connection(
self,
requests: Sequence[IsAuthorizedConnectionRequest],
Expand Down Expand Up @@ -395,47 +400,6 @@ def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMeth
if _is_permitted_dag_id("GET", methods, dag_id) or _is_permitted_dag_id("PUT", methods, dag_id)
}

def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
"""
Filter menu items based on user permissions.

:param menu_items: list of all menu items
"""
items = filter(
lambda item: self.security_manager.has_access(ACTION_CAN_ACCESS_MENU, item.name), menu_items
)
accessible_items = []
for menu_item in items:
menu_item_copy = MenuItem(
**{
**menu_item.__dict__,
"childs": [],
}
)
if menu_item.childs:
accessible_children = []
for child in menu_item.childs:
if self.security_manager.has_access(ACTION_CAN_ACCESS_MENU, child.name):
accessible_children.append(child)
menu_item_copy.childs = accessible_children
accessible_items.append(menu_item_copy)
return accessible_items

@cached_property
def security_manager(self) -> AirflowSecurityManagerV2:
"""
Return the security manager.

By default, Airflow comes with the default security manager
``airflow.www.security_manager.AirflowSecurityManagerV2``. The auth manager might need to extend this
default security manager for its own purposes.

By default, return the default AirflowSecurityManagerV2.
"""
from airflow.www.security_manager import AirflowSecurityManagerV2

return AirflowSecurityManagerV2(getattr(self, "appbuilder"))

@staticmethod
def get_cli_commands() -> list[CLICommand]:
"""
Expand Down
5 changes: 5 additions & 0 deletions airflow/auth/managers/simple/simple_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from airflow.configuration import AIRFLOW_HOME, conf

if TYPE_CHECKING:
from flask_appbuilder.menu import MenuItem

from airflow.auth.managers.models.resource_details import (
AccessView,
AssetDetails,
Expand Down Expand Up @@ -224,6 +226,9 @@ def is_authorized_custom_view(
):
return self._is_authorized(method="GET", allow_role=SimpleAuthManagerRole.VIEWER, user=user)

def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
return menu_items

def register_views(self) -> None:
if not self.appbuilder:
return
Expand Down
6 changes: 5 additions & 1 deletion airflow/www/extensions/init_appbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow import settings
from airflow.api_fastapi.app import create_auth_manager, get_auth_manager
from airflow.configuration import conf
from airflow.www.security_manager import AirflowSecurityManagerV2

if TYPE_CHECKING:
from flask import Flask
Expand Down Expand Up @@ -211,7 +212,10 @@ def init_app(self, app, session):
auth_manager = create_auth_manager()
auth_manager.appbuilder = self
auth_manager.init()
self.sm = auth_manager.security_manager
if hasattr(auth_manager, "security_manager"):
self.sm = auth_manager.security_manager
else:
self.sm = AirflowSecurityManagerV2(self)
self.bm = BabelManager(self)
self._add_global_static()
self._add_global_filters()
Expand Down
4 changes: 4 additions & 0 deletions newsfragments/aip-79.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ As part of this change the following breaking changes have occurred:
- A new abstract method ``deserialize_user`` needs to be implemented

- A new abstract method ``serialize_user`` needs to be implemented

- The property ``security_manager`` has been removed from the interface

- The method ``filter_permitted_menu_items`` is now abstract and must be implemented
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from flask_login import login_user

from airflow.api_fastapi.app import get_auth_manager
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager

if TYPE_CHECKING:
from airflow.providers.fab.auth_manager.models import User
Expand All @@ -46,7 +46,7 @@ def auth_current_user() -> User | None:
if auth is None or not auth.username or not auth.password:
return None

security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
user = None
if security_manager.auth_type == AUTH_LDAP:
user = security_manager.auth_user_ldap(auth.username, auth.password)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.api_fastapi.app import get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.utils.net import getfqdn

if TYPE_CHECKING:
Expand Down Expand Up @@ -115,7 +115,7 @@ def _gssapi_authenticate(token) -> _KerberosAuth | None:


def find_user(username=None, email=None):
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
return security_manager.find_user(username=username, email=email)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.security import requires_access_custom_view
from airflow.api_fastapi.app import get_auth_manager
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.fab.auth_manager.models import Action, Role
from airflow.providers.fab.auth_manager.schemas.role_and_permission_schema import (
ActionCollection,
Expand All @@ -36,11 +37,11 @@
role_collection_schema,
role_schema,
)
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.security import permissions

if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse, UpdateMask
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride


def _check_action_and_resource(sm: FabAirflowSecurityManagerOverride, perms: list[tuple[str, str]]) -> None:
Expand All @@ -59,7 +60,7 @@ def _check_action_and_resource(sm: FabAirflowSecurityManagerOverride, perms: lis
@requires_access_custom_view("GET", permissions.RESOURCE_ROLE)
def get_role(*, role_name: str) -> APIResponse:
"""Get role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
role = security_manager.find_role(name=role_name)
if not role:
raise NotFound(title="Role not found", detail=f"Role with name {role_name!r} was not found")
Expand All @@ -70,7 +71,7 @@ def get_role(*, role_name: str) -> APIResponse:
@format_parameters({"limit": check_limit})
def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None) -> APIResponse:
"""Get roles."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
session = security_manager.get_session
total_entries = session.scalars(select(func.count(Role.id))).one()
direction = desc if order_by.startswith("-") else asc
Expand Down Expand Up @@ -98,7 +99,7 @@ def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None)
@format_parameters({"limit": check_limit})
def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
"""Get permissions."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
session = security_manager.get_session
total_entries = session.scalars(select(func.count(Action.id))).one()
query = select(Action)
Expand All @@ -109,7 +110,7 @@ def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
@requires_access_custom_view("DELETE", permissions.RESOURCE_ROLE)
def delete_role(*, role_name: str) -> APIResponse:
"""Delete a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager

role = security_manager.find_role(name=role_name)
if not role:
Expand All @@ -121,7 +122,7 @@ def delete_role(*, role_name: str) -> APIResponse:
@requires_access_custom_view("PUT", permissions.RESOURCE_ROLE)
def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse:
"""Update a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
body = request.json
try:
data = role_schema.load(body)
Expand Down Expand Up @@ -154,7 +155,7 @@ def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse
@requires_access_custom_view("POST", permissions.RESOURCE_ROLE)
def post_role() -> APIResponse:
"""Create a new role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
body = request.json
try:
data = role_schema.load(body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.security import requires_access_custom_view
from airflow.api_fastapi.app import get_auth_manager
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.fab.auth_manager.models import User
from airflow.providers.fab.auth_manager.schemas.user_schema import (
UserCollection,
user_collection_item_schema,
user_collection_schema,
user_schema,
)
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.security import permissions

if TYPE_CHECKING:
Expand All @@ -47,7 +47,7 @@
@requires_access_custom_view("GET", permissions.RESOURCE_USER)
def get_user(*, username: str) -> APIResponse:
"""Get a user."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
user = security_manager.find_user(username=username)
if not user:
raise NotFound(title="User not found", detail=f"The User with username `{username}` was not found")
Expand All @@ -58,7 +58,7 @@ def get_user(*, username: str) -> APIResponse:
@format_parameters({"limit": check_limit})
def get_users(*, limit: int, order_by: str = "id", offset: str | None = None) -> APIResponse:
"""Get users."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
session = security_manager.get_session
total_entries = session.execute(select(func.count(User.id))).scalar()
direction = desc if order_by.startswith("-") else asc
Expand Down Expand Up @@ -94,7 +94,7 @@ def post_user() -> APIResponse:
except ValidationError as e:
raise BadRequest(detail=str(e.messages))

security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager
username = data["username"]
email = data["email"]

Expand Down Expand Up @@ -137,7 +137,7 @@ def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse:
except ValidationError as e:
raise BadRequest(detail=str(e.messages))

security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager

user = security_manager.find_user(username=username)
if user is None:
Expand Down Expand Up @@ -201,7 +201,7 @@ def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse:
@requires_access_custom_view("DELETE", permissions.RESOURCE_USER)
def delete_user(*, username: str) -> APIResponse:
"""Delete a user."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
security_manager = cast(FabAuthManager, get_auth_manager()).security_manager

user = security_manager.find_user(username=username)
if user is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
from typing import TYPE_CHECKING

from flask import Flask
from sqlalchemy.engine import make_url

import airflow
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.www.app import make_url
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_session import init_airflow_session_interface
from airflow.www.extensions.init_views import init_plugins
from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder
from airflow.providers.fab.www.extensions.init_session import init_airflow_session_interface
from airflow.providers.fab.www.extensions.init_views import init_plugins

if TYPE_CHECKING:
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder
from airflow.providers.fab.www.extensions.init_appbuilder import AirflowAppBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck We don't have add_permissions() method in from airflow.providers.fab.www.extensions.init_appbuilder import AirflowAppBuilder class. Causing airflow sync-perm command to fail on the main branch.

Updating actions and resources for all existing roles
webserver  | Traceback (most recent call last):
webserver  |   File "/usr/local/bin/airflow", line 8, in <module>
webserver  |     sys.exit(main())
webserver  |              ^^^^^^
webserver  |   File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 58, in main
webserver  |     args.func(args)
webserver  |   File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
webserver  |     return func(*args, **kwargs)
webserver  |            ^^^^^^^^^^^^^^^^^^^^^
webserver  |   File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 111, in wrapper
webserver  |     return f(*args, **kwargs)
webserver  |            ^^^^^^^^^^^^^^^^^^
webserver  |   File "/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
webserver  |     return func(*args, **kwargs)
webserver  |            ^^^^^^^^^^^^^^^^^^^^^
webserver  |   File "/usr/local/lib/python3.12/site-packages/airflow/providers/fab/auth_manager/cli_commands/sync_perm_command.py", line 35, in sync_perm
webserver  |     appbuilder.add_permissions(update_perms=True)
webserver  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
webserver  | AttributeError: 'AirflowAppBuilder' object has no attribute 'add_permissions'. Did you mean: '_add_permission'?

Is there an alternative method we should use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still happening? The solution would be to copy over add_permissions from core Airflow to FAB provider

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck, I have created a PR for the same. PTAL - #45611



@cache
Expand Down
Loading