diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml
index 302d6f2afafd8..c111dd424e901 100644
--- a/providers/edge3/pyproject.toml
+++ b/providers/edge3/pyproject.toml
@@ -58,6 +58,7 @@ requires-python = "~=3.9"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.10.0",
+ "apache-airflow-providers-fab>=1.5.3",
"pydantic>=2.11.0",
"retryhttp>=1.2.0,!=1.3.0",
]
@@ -67,6 +68,7 @@ dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
+ "apache-airflow-providers-fab",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
]
diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
index a8a4f5b374018..2d06f78e7b975 100644
--- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
+++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
@@ -17,179 +17,184 @@
from __future__ import annotations
+import re
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from flask import Blueprint, redirect, request, url_for
+from flask_appbuilder import BaseView, expose
+from markupsafe import Markup
+from sqlalchemy import select
+
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
+from airflow.models.taskinstance import TaskInstanceState
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.utils.state import State
if AIRFLOW_V_3_0_PLUS:
+ from airflow.api_fastapi.auth.managers.models.resource_details import AccessView
+ from airflow.providers.fab.www.auth import has_access_view
+
+else:
+ from airflow.auth.managers.models.resource_details import AccessView # type: ignore[no-redef]
+ from airflow.www.auth import has_access_view # type: ignore[no-redef]
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.yaml import safe_load
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+
+def _get_airflow_2_api_endpoint() -> Blueprint:
+ from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
+ from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver
+
+ folder = Path(__file__).parents[1].resolve() # this is airflow/providers/edge3/
+ with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f:
+ specification = safe_load(f)
+ from connexion import FlaskApi
+
+ bp = FlaskApi(
+ specification=specification,
+ resolver=_LazyResolver(),
+ base_path="/edge_worker/v1",
+ strict_validation=True,
+ options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
+ validate_responses=True,
+ validator_map={"body": _CustomErrorRequestBodyValidator},
+ ).blueprint
+ # Need to exempt CSRF to make API usable
+ from airflow.www.app import csrf
+
+ csrf.exempt(bp)
+ return bp
+
+
+def _get_api_endpoint() -> dict[str, Any]:
from airflow.providers.edge3.worker_api.app import create_edge_worker_api_app
- def _get_api_endpoint() -> dict[str, Any]:
- return {
- "app": create_edge_worker_api_app(),
- "url_prefix": "/edge_worker/v1",
- "name": "Airflow Edge Worker API",
+ return {
+ "app": create_edge_worker_api_app(),
+ "url_prefix": "/edge_worker/v1",
+ "name": "Airflow Edge Worker API",
+ }
+
+
+def _state_token(state):
+ """Return a formatted string with HTML for a given State."""
+ color = State.color(state)
+ fg_color = State.color_fg(state)
+ return Markup(
+ """
+ {state}
+ """
+ ).format(color=color, state=state, fg_color=fg_color)
+
+
+def modify_maintenance_comment_on_update(maintenance_comment: str | None, username: str) -> str:
+ if maintenance_comment:
+ if re.search(
+ r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*", maintenance_comment
+ ):
+ return re.sub(
+ r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:",
+ f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
+ maintenance_comment,
+ )
+ if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*", maintenance_comment):
+ return re.sub(
+ r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:",
+ f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
+ maintenance_comment,
+ )
+ return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment: {maintenance_comment}"
+ return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:"
+
+
+# registers airflow/providers/edge3/plugins/templates as a Jinja template folder
+template_bp = Blueprint(
+ "template_blueprint",
+ __name__,
+ template_folder="templates",
+)
+
+
+class EdgeWorkerJobs(BaseView):
+ """Simple view to show Edge Worker jobs."""
+
+ default_view = "jobs"
+
+ @expose("/jobs")
+ @has_access_view(AccessView.JOBS)
+ @provide_session
+ def jobs(self, session: Session = NEW_SESSION):
+ from airflow.providers.edge3.models.edge_job import EdgeJobModel
+
+ jobs = session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all()
+ html_states = {
+ str(state): _state_token(str(state)) for state in TaskInstanceState.__members__.values()
}
+ return self.render_template("edge_worker_jobs.html", jobs=jobs, html_states=html_states)
-else:
- # This is for back-compatability with Airflow 2.x and we only make this
- # to prevents dependencies and breaking imports in Airflow 3.x
- import re
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import TYPE_CHECKING, Any
-
- from flask import Blueprint, redirect, request, url_for
- from flask_appbuilder import BaseView, expose
- from markupsafe import Markup
- from sqlalchemy import select
-
- from airflow.auth.managers.models.resource_details import AccessView
- from airflow.models.taskinstance import TaskInstanceState
- from airflow.utils.session import NEW_SESSION, provide_session
- from airflow.utils.state import State
- from airflow.utils.yaml import safe_load
- from airflow.www.auth import has_access_view
-
- if TYPE_CHECKING:
- from sqlalchemy.orm import Session
-
- def _get_airflow_2_api_endpoint() -> Blueprint:
- from airflow.www.app import csrf
- from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
- from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver
-
- folder = Path(__file__).parents[1].resolve() # this is airflow/providers/edge3/
- with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f:
- specification = safe_load(f)
- from connexion import FlaskApi
-
- bp = FlaskApi(
- specification=specification,
- resolver=_LazyResolver(),
- base_path="/edge_worker/v1",
- strict_validation=True,
- options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
- validate_responses=True,
- validator_map={"body": _CustomErrorRequestBodyValidator},
- ).blueprint
- # Need to exempt CSRF to make API usable
- csrf.exempt(bp)
- return bp
-
- def _state_token(state):
- """Return a formatted string with HTML for a given State."""
- color = State.color(state)
- fg_color = State.color_fg(state)
- return Markup(
- """
- {state}
- """
- ).format(color=color, state=state, fg_color=fg_color)
-
- def modify_maintenance_comment_on_update(maintenance_comment: str | None, username: str) -> str:
- if maintenance_comment:
- if re.search(
- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*", maintenance_comment
- ):
- return re.sub(
- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:",
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
- maintenance_comment,
- )
- if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*", maintenance_comment):
- return re.sub(
- r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:",
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
- maintenance_comment,
- )
- return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment: {maintenance_comment}"
- return (
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:"
- )
-
- # registers airflow/providers/edge3/plugins/templates as a Jinja template folder
- template_bp = Blueprint(
- "template_blueprint",
- __name__,
- template_folder="templates",
- )
-
- class EdgeWorkerJobs(BaseView):
- """Simple view to show Edge Worker jobs."""
-
- default_view = "jobs"
-
- @expose("/jobs")
- @has_access_view(AccessView.JOBS)
- @provide_session
- def jobs(self, session: Session = NEW_SESSION):
- from airflow.providers.edge3.models.edge_job import EdgeJobModel
-
- jobs = session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all()
- html_states = {
- str(state): _state_token(str(state)) for state in TaskInstanceState.__members__.values()
- }
- return self.render_template("edge_worker_jobs.html", jobs=jobs, html_states=html_states)
-
- class EdgeWorkerHosts(BaseView):
- """Simple view to show Edge Worker status."""
-
- default_view = "status"
-
- @expose("/status")
- @has_access_view(AccessView.JOBS)
- @provide_session
- def status(self, session: Session = NEW_SESSION):
- from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
-
- hosts = session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all()
- five_min_ago = datetime.now() - timedelta(minutes=5)
- return self.render_template("edge_worker_hosts.html", hosts=hosts, five_min_ago=five_min_ago)
-
- @expose("/status/maintenance//on", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def worker_to_maintenance(self, worker_name: str):
- from flask_login import current_user
-
- from airflow.providers.edge3.models.edge_worker import request_maintenance
-
- maintenance_comment = request.form.get("maintenance_comment")
- maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {current_user.username} put node into maintenance mode\nComment: {maintenance_comment}"
- request_maintenance(worker_name, maintenance_comment)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//off", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def remove_worker_from_maintenance(self, worker_name: str):
- from airflow.providers.edge3.models.edge_worker import exit_maintenance
-
- exit_maintenance(worker_name)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//remove", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def remove_worker(self, worker_name: str):
- from airflow.providers.edge3.models.edge_worker import remove_worker
-
- remove_worker(worker_name)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//change_comment", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def change_maintenance_comment(self, worker_name: str):
- from flask_login import current_user
-
- from airflow.providers.edge3.models.edge_worker import change_maintenance_comment
-
- maintenance_comment = request.form.get("maintenance_comment")
- maintenance_comment = modify_maintenance_comment_on_update(
- maintenance_comment, current_user.username
- )
- change_maintenance_comment(worker_name, maintenance_comment)
- return redirect(url_for("EdgeWorkerHosts.status"))
+
+class EdgeWorkerHosts(BaseView):
+ """Simple view to show Edge Worker status."""
+
+ default_view = "status"
+
+ @expose("/status")
+ @has_access_view(AccessView.JOBS)
+ @provide_session
+ def status(self, session: Session = NEW_SESSION):
+ from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
+
+ hosts = session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all()
+ five_min_ago = datetime.now() - timedelta(minutes=5)
+ return self.render_template("edge_worker_hosts.html", hosts=hosts, five_min_ago=five_min_ago)
+
+ @expose("/status/maintenance//on", methods=["POST"])
+ @has_access_view(AccessView.JOBS)
+ def worker_to_maintenance(self, worker_name: str):
+ from flask_login import current_user
+
+ from airflow.providers.edge3.models.edge_worker import request_maintenance
+
+ maintenance_comment = request.form.get("maintenance_comment")
+ maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {current_user.username} put node into maintenance mode\nComment: {maintenance_comment}"
+ request_maintenance(worker_name, maintenance_comment)
+ return redirect(url_for("EdgeWorkerHosts.status"))
+
+ @expose("/status/maintenance//off", methods=["POST"])
+ @has_access_view(AccessView.JOBS)
+ def remove_worker_from_maintenance(self, worker_name: str):
+ from airflow.providers.edge3.models.edge_worker import exit_maintenance
+
+ exit_maintenance(worker_name)
+ return redirect(url_for("EdgeWorkerHosts.status"))
+
+ @expose("/status/maintenance//remove", methods=["POST"])
+ @has_access_view(AccessView.JOBS)
+ def remove_worker(self, worker_name: str):
+ from airflow.providers.edge3.models.edge_worker import remove_worker
+
+ remove_worker(worker_name)
+ return redirect(url_for("EdgeWorkerHosts.status"))
+
+ @expose("/status/maintenance//change_comment", methods=["POST"])
+ @has_access_view(AccessView.JOBS)
+ def change_maintenance_comment(self, worker_name: str):
+ from flask_login import current_user
+
+ from airflow.providers.edge3.models.edge_worker import change_maintenance_comment
+
+ maintenance_comment = request.form.get("maintenance_comment")
+ maintenance_comment = modify_maintenance_comment_on_update(maintenance_comment, current_user.username)
+ change_maintenance_comment(worker_name, maintenance_comment)
+ return redirect(url_for("EdgeWorkerHosts.status"))
# Check if EdgeExecutor is actually loaded
@@ -204,19 +209,21 @@ class EdgeExecutorPlugin(AirflowPlugin):
name = "edge_executor"
if EDGE_EXECUTOR_ACTIVE:
+ appbuilder_views = [
+ {
+ "name": "Edge Worker Jobs",
+ "category": "Admin",
+ "view": EdgeWorkerJobs(),
+ },
+ {
+ "name": "Edge Worker Hosts",
+ "category": "Admin",
+ "view": EdgeWorkerHosts(),
+ },
+ ]
+
if AIRFLOW_V_3_0_PLUS:
fastapi_apps = [_get_api_endpoint()]
+ flask_blueprints = [template_bp]
else:
- appbuilder_views = [
- {
- "name": "Edge Worker Jobs",
- "category": "Admin",
- "view": EdgeWorkerJobs(),
- },
- {
- "name": "Edge Worker Hosts",
- "category": "Admin",
- "view": EdgeWorkerHosts(),
- },
- ]
flask_blueprints = [_get_airflow_2_api_endpoint(), template_bp]
diff --git a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
index 178625c4cf42c..52e837387502e 100644
--- a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
+++ b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
@@ -54,12 +54,11 @@ def test_plugin_active():
rep = EdgeExecutorPlugin()
assert EDGE_EXECUTOR_ACTIVE
+ assert len(rep.appbuilder_views) == 2
if AIRFLOW_V_3_0_PLUS:
- assert len(rep.appbuilder_views) == 0
- assert len(rep.flask_blueprints) == 0
+ assert len(rep.flask_blueprints) == 1
assert len(rep.fastapi_apps) == 1
else:
- assert len(rep.appbuilder_views) == 2
assert len(rep.flask_blueprints) == 2
@@ -74,7 +73,6 @@ def test_plugin_is_airflow_plugin(plugin):
assert isinstance(plugin, AirflowPlugin)
-@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Plugin endpoint is not used in Airflow 3.0+")
@pytest.mark.parametrize(
"initial_comment, expected_comment",
[