Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions airflow-core/src/airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from functools import cache
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

from fastapi import FastAPI
Expand All @@ -31,7 +31,6 @@
init_error_handlers,
init_flask_plugins,
init_middlewares,
init_ui_plugins,
init_views,
)
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
Expand Down Expand Up @@ -99,7 +98,6 @@ def create_app(apps: str = "all") -> FastAPI:
init_plugins(app)
init_auth_manager(app)
init_flask_plugins(app)
init_ui_plugins(app)
init_views(app) # Core views need to be the last routes added - it has a catch all route
init_error_handlers(app)
init_middlewares(app)
Expand Down Expand Up @@ -171,10 +169,9 @@ def init_plugins(app: FastAPI) -> None:
"""Integrate FastAPI app, middlewares and UI plugins."""
from airflow import plugins_manager

plugins_manager.initialize_fastapi_plugins()
apps, root_middlewares = plugins_manager.get_fastapi_plugins()

# After calling initialize_fastapi_plugins, fastapi_apps cannot be None anymore.
for subapp_dict in cast("list", plugins_manager.fastapi_apps):
for subapp_dict in apps:
name = subapp_dict.get("name")
subapp = subapp_dict.get("app")
if subapp is None:
Expand All @@ -194,8 +191,7 @@ def init_plugins(app: FastAPI) -> None:
log.debug("Adding subapplication %s under prefix %s", name, url_prefix)
app.mount(url_prefix, subapp)

# After calling initialize_fastapi_plugins, fastapi_root_middlewares cannot be None anymore.
for middleware_dict in cast("list", plugins_manager.fastapi_root_middlewares):
for middleware_dict in root_middlewares:
name = middleware_dict.get("name")
middleware = middleware_dict.get("middleware")
args = middleware_dict.get("args", [])
Expand Down
15 changes: 2 additions & 13 deletions airflow-core/src/airflow/api_fastapi/core_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,10 @@ def init_flask_plugins(app: FastAPI) -> None:
"""Integrate Flask plugins (plugins from Airflow 2)."""
from airflow import plugins_manager

plugins_manager.initialize_flask_plugins()
blueprints, appbuilder_views, appbuilder_menu_links = plugins_manager.get_flask_plugins()

# If no Airflow 2.x plugin is in the environment, no need to go further
if (
not plugins_manager.flask_blueprints
and not plugins_manager.flask_appbuilder_views
and not plugins_manager.flask_appbuilder_menu_links
):
if not blueprints and not appbuilder_views and not appbuilder_menu_links:
return

from fastapi.middleware.wsgi import WSGIMiddleware
Expand Down Expand Up @@ -190,10 +186,3 @@ def init_middlewares(app: FastAPI) -> None:
from airflow.api_fastapi.auth.managers.simple.middleware import SimpleAllAdminMiddleware

app.add_middleware(SimpleAllAdminMiddleware)


def init_ui_plugins(app: FastAPI) -> None:
"""Initialize UI plugins."""
from airflow import plugins_manager

plugins_manager.initialize_ui_plugins()
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,10 @@ def get_plugins(
dependencies=[Depends(requires_access_view(AccessView.PLUGINS))],
)
def import_errors() -> PluginImportErrorCollectionResponse:
plugins_manager.ensure_plugins_loaded() # make sure import_errors are loaded

import_errors = plugins_manager.get_import_errors()
return PluginImportErrorCollectionResponse.model_validate(
{
"import_errors": [
{"source": source, "error": error} for source, error in plugins_manager.import_errors.items()
],
"total_entries": len(plugins_manager.import_errors),
"import_errors": [{"source": source, "error": error} for source, error in import_errors.items()],
"total_entries": len(import_errors),
}
)
21 changes: 2 additions & 19 deletions airflow-core/src/airflow/cli/commands/plugins_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,18 @@
# under the License.
from __future__ import annotations

import inspect
from typing import Any

from airflow import plugins_manager
from airflow.cli.simple_table import AirflowConsole
from airflow.plugins_manager import PluginsDirectorySource, get_plugin_info
from airflow.plugins_manager import get_plugin_info
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import providers_configuration_loaded


def _get_name(class_like_object) -> str:
if isinstance(class_like_object, (str, PluginsDirectorySource)):
return str(class_like_object)
if inspect.isclass(class_like_object):
return class_like_object.__name__
return class_like_object.__class__.__name__


def _join_plugins_names(value: list[Any] | Any) -> str:
value = value if isinstance(value, list) else [value]
return ",".join(_get_name(v) for v in value)


@suppress_logs_and_warning
@providers_configuration_loaded
def dump_plugins(args):
"""Dump plugins information."""
plugins_info: list[dict[str, str]] = get_plugin_info()
if not plugins_manager.plugins:
if not plugins_info:
print("No plugins loaded")
return

Expand Down
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
from airflow import plugins_manager

plugins_manager.initialize_hook_lineage_readers_plugins()
if plugins_manager.hook_lineage_reader_classes:
if plugins_manager.get_hook_lineage_readers_plugins():
return HookLineageCollector()
return NoOpCollector()
Loading
Loading