diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml index c111dd424e901..302d6f2afafd8 100644 --- a/providers/edge3/pyproject.toml +++ b/providers/edge3/pyproject.toml @@ -58,7 +58,6 @@ requires-python = "~=3.9" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.10.0", - "apache-airflow-providers-fab>=1.5.3", "pydantic>=2.11.0", "retryhttp>=1.2.0,!=1.3.0", ] @@ -68,7 +67,6 @@ dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", - "apache-airflow-providers-fab", # Additional devel dependencies (do not remove this line and add extra development dependencies) ] diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py index 2d06f78e7b975..a8a4f5b374018 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py +++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py @@ -17,184 +17,179 @@ from __future__ import annotations -import re -from datetime import datetime, timedelta -from pathlib import Path -from typing import TYPE_CHECKING, Any - -from flask import Blueprint, redirect, request, url_for -from flask_appbuilder import BaseView, expose -from markupsafe import Markup -from sqlalchemy import select - from airflow.configuration import conf from airflow.exceptions import AirflowConfigException -from airflow.models.taskinstance import TaskInstanceState from airflow.plugins_manager import AirflowPlugin from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.utils.state import State if AIRFLOW_V_3_0_PLUS: - from airflow.api_fastapi.auth.managers.models.resource_details import AccessView - from airflow.providers.fab.www.auth import has_access_view - -else: - from airflow.auth.managers.models.resource_details import AccessView # type: ignore[no-redef] - from airflow.www.auth import has_access_view # type: ignore[no-redef] -from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.yaml import safe_load - -if TYPE_CHECKING: - from sqlalchemy.orm import Session - - -def _get_airflow_2_api_endpoint() -> Blueprint: - from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED - from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver - - folder = Path(__file__).parents[1].resolve() # this is airflow/providers/edge3/ - with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f: - specification = safe_load(f) - from connexion import FlaskApi - - bp = FlaskApi( - specification=specification, - resolver=_LazyResolver(), - base_path="/edge_worker/v1", - strict_validation=True, - options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()}, - validate_responses=True, - validator_map={"body": _CustomErrorRequestBodyValidator}, - ).blueprint - # Need to exempt CSRF to make API usable - from airflow.www.app import csrf - - csrf.exempt(bp) - return bp - - -def _get_api_endpoint() -> dict[str, Any]: from airflow.providers.edge3.worker_api.app import create_edge_worker_api_app - return { - "app": create_edge_worker_api_app(), - "url_prefix": "/edge_worker/v1", - "name": "Airflow Edge Worker API", - } - - -def _state_token(state): - """Return a formatted string with HTML for a given State.""" - color = State.color(state) - fg_color = State.color_fg(state) - return Markup( - """ - {state} - """ - ).format(color=color, state=state, fg_color=fg_color) - - -def modify_maintenance_comment_on_update(maintenance_comment: str | None, username: str) -> str: - if maintenance_comment: - if re.search( - r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*", maintenance_comment - ): - return re.sub( - r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:", - f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:", - maintenance_comment, - ) - if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*", maintenance_comment): - return re.sub( - r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:", - f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:", - maintenance_comment, - ) - return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment: {maintenance_comment}" - return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:" - - -# registers airflow/providers/edge3/plugins/templates as a Jinja template folder -template_bp = Blueprint( - "template_blueprint", - __name__, - template_folder="templates", -) - - -class EdgeWorkerJobs(BaseView): - """Simple view to show Edge Worker jobs.""" - - default_view = "jobs" - - @expose("/jobs") - @has_access_view(AccessView.JOBS) - @provide_session - def jobs(self, session: Session = NEW_SESSION): - from airflow.providers.edge3.models.edge_job import EdgeJobModel - - jobs = session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all() - html_states = { - str(state): _state_token(str(state)) for state in TaskInstanceState.__members__.values() + def _get_api_endpoint() -> dict[str, Any]: + return { + "app": create_edge_worker_api_app(), + "url_prefix": "/edge_worker/v1", + "name": "Airflow Edge Worker API", } - return self.render_template("edge_worker_jobs.html", jobs=jobs, html_states=html_states) - - -class EdgeWorkerHosts(BaseView): - """Simple view to show Edge Worker status.""" - - default_view = "status" - - @expose("/status") - @has_access_view(AccessView.JOBS) - @provide_session - def status(self, session: Session = NEW_SESSION): - from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel - - hosts = session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all() - five_min_ago = datetime.now() - timedelta(minutes=5) - return self.render_template("edge_worker_hosts.html", hosts=hosts, five_min_ago=five_min_ago) - - @expose("/status/maintenance//on", methods=["POST"]) - @has_access_view(AccessView.JOBS) - def worker_to_maintenance(self, worker_name: str): - from flask_login import current_user - - from airflow.providers.edge3.models.edge_worker import request_maintenance - - maintenance_comment = request.form.get("maintenance_comment") - maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {current_user.username} put node into maintenance mode\nComment: {maintenance_comment}" - request_maintenance(worker_name, maintenance_comment) - return redirect(url_for("EdgeWorkerHosts.status")) - @expose("/status/maintenance//off", methods=["POST"]) - @has_access_view(AccessView.JOBS) - def remove_worker_from_maintenance(self, worker_name: str): - from airflow.providers.edge3.models.edge_worker import exit_maintenance - - exit_maintenance(worker_name) - return redirect(url_for("EdgeWorkerHosts.status")) - - @expose("/status/maintenance//remove", methods=["POST"]) - @has_access_view(AccessView.JOBS) - def remove_worker(self, worker_name: str): - from airflow.providers.edge3.models.edge_worker import remove_worker - - remove_worker(worker_name) - return redirect(url_for("EdgeWorkerHosts.status")) - - @expose("/status/maintenance//change_comment", methods=["POST"]) - @has_access_view(AccessView.JOBS) - def change_maintenance_comment(self, worker_name: str): - from flask_login import current_user - - from airflow.providers.edge3.models.edge_worker import change_maintenance_comment - - maintenance_comment = request.form.get("maintenance_comment") - maintenance_comment = modify_maintenance_comment_on_update(maintenance_comment, current_user.username) - change_maintenance_comment(worker_name, maintenance_comment) - return redirect(url_for("EdgeWorkerHosts.status")) +else: + # This is for back-compatability with Airflow 2.x and we only make this + # to prevents dependencies and breaking imports in Airflow 3.x + import re + from datetime import datetime, timedelta + from pathlib import Path + from typing import TYPE_CHECKING, Any + + from flask import Blueprint, redirect, request, url_for + from flask_appbuilder import BaseView, expose + from markupsafe import Markup + from sqlalchemy import select + + from airflow.auth.managers.models.resource_details import AccessView + from airflow.models.taskinstance import TaskInstanceState + from airflow.utils.session import NEW_SESSION, provide_session + from airflow.utils.state import State + from airflow.utils.yaml import safe_load + from airflow.www.auth import has_access_view + + if TYPE_CHECKING: + from sqlalchemy.orm import Session + + def _get_airflow_2_api_endpoint() -> Blueprint: + from airflow.www.app import csrf + from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED + from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver + + folder = Path(__file__).parents[1].resolve() # this is airflow/providers/edge3/ + with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f: + specification = safe_load(f) + from connexion import FlaskApi + + bp = FlaskApi( + specification=specification, + resolver=_LazyResolver(), + base_path="/edge_worker/v1", + strict_validation=True, + options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()}, + validate_responses=True, + validator_map={"body": _CustomErrorRequestBodyValidator}, + ).blueprint + # Need to exempt CSRF to make API usable + csrf.exempt(bp) + return bp + + def _state_token(state): + """Return a formatted string with HTML for a given State.""" + color = State.color(state) + fg_color = State.color_fg(state) + return Markup( + """ + {state} + """ + ).format(color=color, state=state, fg_color=fg_color) + + def modify_maintenance_comment_on_update(maintenance_comment: str | None, username: str) -> str: + if maintenance_comment: + if re.search( + r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*", maintenance_comment + ): + return re.sub( + r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:", + f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:", + maintenance_comment, + ) + if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*", maintenance_comment): + return re.sub( + r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:", + f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:", + maintenance_comment, + ) + return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment: {maintenance_comment}" + return ( + f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:" + ) + + # registers airflow/providers/edge3/plugins/templates as a Jinja template folder + template_bp = Blueprint( + "template_blueprint", + __name__, + template_folder="templates", + ) + + class EdgeWorkerJobs(BaseView): + """Simple view to show Edge Worker jobs.""" + + default_view = "jobs" + + @expose("/jobs") + @has_access_view(AccessView.JOBS) + @provide_session + def jobs(self, session: Session = NEW_SESSION): + from airflow.providers.edge3.models.edge_job import EdgeJobModel + + jobs = session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all() + html_states = { + str(state): _state_token(str(state)) for state in TaskInstanceState.__members__.values() + } + return self.render_template("edge_worker_jobs.html", jobs=jobs, html_states=html_states) + + class EdgeWorkerHosts(BaseView): + """Simple view to show Edge Worker status.""" + + default_view = "status" + + @expose("/status") + @has_access_view(AccessView.JOBS) + @provide_session + def status(self, session: Session = NEW_SESSION): + from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel + + hosts = session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all() + five_min_ago = datetime.now() - timedelta(minutes=5) + return self.render_template("edge_worker_hosts.html", hosts=hosts, five_min_ago=five_min_ago) + + @expose("/status/maintenance//on", methods=["POST"]) + @has_access_view(AccessView.JOBS) + def worker_to_maintenance(self, worker_name: str): + from flask_login import current_user + + from airflow.providers.edge3.models.edge_worker import request_maintenance + + maintenance_comment = request.form.get("maintenance_comment") + maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {current_user.username} put node into maintenance mode\nComment: {maintenance_comment}" + request_maintenance(worker_name, maintenance_comment) + return redirect(url_for("EdgeWorkerHosts.status")) + + @expose("/status/maintenance//off", methods=["POST"]) + @has_access_view(AccessView.JOBS) + def remove_worker_from_maintenance(self, worker_name: str): + from airflow.providers.edge3.models.edge_worker import exit_maintenance + + exit_maintenance(worker_name) + return redirect(url_for("EdgeWorkerHosts.status")) + + @expose("/status/maintenance//remove", methods=["POST"]) + @has_access_view(AccessView.JOBS) + def remove_worker(self, worker_name: str): + from airflow.providers.edge3.models.edge_worker import remove_worker + + remove_worker(worker_name) + return redirect(url_for("EdgeWorkerHosts.status")) + + @expose("/status/maintenance//change_comment", methods=["POST"]) + @has_access_view(AccessView.JOBS) + def change_maintenance_comment(self, worker_name: str): + from flask_login import current_user + + from airflow.providers.edge3.models.edge_worker import change_maintenance_comment + + maintenance_comment = request.form.get("maintenance_comment") + maintenance_comment = modify_maintenance_comment_on_update( + maintenance_comment, current_user.username + ) + change_maintenance_comment(worker_name, maintenance_comment) + return redirect(url_for("EdgeWorkerHosts.status")) # Check if EdgeExecutor is actually loaded @@ -209,21 +204,19 @@ class EdgeExecutorPlugin(AirflowPlugin): name = "edge_executor" if EDGE_EXECUTOR_ACTIVE: - appbuilder_views = [ - { - "name": "Edge Worker Jobs", - "category": "Admin", - "view": EdgeWorkerJobs(), - }, - { - "name": "Edge Worker Hosts", - "category": "Admin", - "view": EdgeWorkerHosts(), - }, - ] - if AIRFLOW_V_3_0_PLUS: fastapi_apps = [_get_api_endpoint()] - flask_blueprints = [template_bp] else: + appbuilder_views = [ + { + "name": "Edge Worker Jobs", + "category": "Admin", + "view": EdgeWorkerJobs(), + }, + { + "name": "Edge Worker Hosts", + "category": "Admin", + "view": EdgeWorkerHosts(), + }, + ] flask_blueprints = [_get_airflow_2_api_endpoint(), template_bp] diff --git a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py index 52e837387502e..178625c4cf42c 100644 --- a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py +++ b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py @@ -54,11 +54,12 @@ def test_plugin_active(): rep = EdgeExecutorPlugin() assert EDGE_EXECUTOR_ACTIVE - assert len(rep.appbuilder_views) == 2 if AIRFLOW_V_3_0_PLUS: - assert len(rep.flask_blueprints) == 1 + assert len(rep.appbuilder_views) == 0 + assert len(rep.flask_blueprints) == 0 assert len(rep.fastapi_apps) == 1 else: + assert len(rep.appbuilder_views) == 2 assert len(rep.flask_blueprints) == 2 @@ -73,6 +74,7 @@ def test_plugin_is_airflow_plugin(plugin): assert isinstance(plugin, AirflowPlugin) +@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Plugin endpoint is not used in Airflow 3.0+") @pytest.mark.parametrize( "initial_comment, expected_comment", [