diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index d25d26bce0d65..ebb6b15b31b2e 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -268,7 +268,8 @@ repos:
^.*openapi.*\.yaml$|
^\.pre-commit-config\.yaml$|
^.*reproducible_build\.yaml$|
- ^.*pnpm-lock\.yaml$
+ ^.*pnpm-lock\.yaml$|
+ ^.*-generated\.yaml$
- repo: https://github.com/ikamensh/flynt
rev: 97be693bf18bc2f050667dd282d243e2824b81e2 # frozen: 1.0.6
hooks:
diff --git a/.rat-excludes b/.rat-excludes
index 3068aca887289..47438f0fe90aa 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -313,7 +313,7 @@ www-hash.txt
*generated.*
/src/airflow/providers/keycloak/auth_manager/openapi/v2-keycloak-auth-manager-generated.yaml
/src/airflow/providers/edge3/plugins/www/*
-/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v2-fab-auth-manager-generated.yaml
/src/airflow/providers/fab/www/static/dist/*
/any/dag_id=dag_for_testing_redis_task_handler/run_id=test/task_id=task_for_testing_redis_log_handler/attempt=1.log
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py
index 85651f14ee89a..7cef7d4bf07d9 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -736,7 +736,7 @@ def get_airflow_extras():
{
"python-version": "3.10",
"airflow-version": "2.11.0",
- "remove-providers": "common.messaging fab git keycloak",
+ "remove-providers": "common.messaging edge3 fab git keycloak",
"run-unit-tests": "true",
},
{
diff --git a/go-sdk/pkg/edgeapi/client.go b/go-sdk/pkg/edgeapi/client.go
index d4d888ffbbf5c..ef81645b2870d 100644
--- a/go-sdk/pkg/edgeapi/client.go
+++ b/go-sdk/pkg/edgeapi/client.go
@@ -27,7 +27,7 @@ import (
//go:generate -command openapi-gen go run github.com/ashb/oapi-resty-codegen@latest --config oapi-codegen.yml
-//go:generate openapi-gen https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+//go:generate openapi-gen https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
func WithEdgeAPIJWTKey(key []byte) ClientOption {
return func(c *Client) error {
diff --git a/providers/edge3/.pre-commit-config.yaml b/providers/edge3/.pre-commit-config.yaml
index 230ead00cf395..47b0c6ec6f7a6 100644
--- a/providers/edge3/.pre-commit-config.yaml
+++ b/providers/edge3/.pre-commit-config.yaml
@@ -37,7 +37,7 @@ repos:
files: |
(?x)
^src/airflow/providers/edge3/plugins/www/.*\.(js|ts|tsx|yaml|css|json)$|
- ^src/airflow/providers/edge3/openapi/v2-edge-generated.yaml$
+ ^src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml$
exclude: |
(?x)
^src/airflow/providers/edge3/plugins/www/node-modules/.*|
diff --git a/providers/edge3/README.rst b/providers/edge3/README.rst
index 2aa03b15bff11..c288aef4338ae 100644
--- a/providers/edge3/README.rst
+++ b/providers/edge3/README.rst
@@ -65,7 +65,7 @@ Requirements
========================================== ====================
PIP package Version required
========================================== ====================
-``apache-airflow`` ``>=2.11.0,!=3.1.0``
+``apache-airflow`` ``>=3.0.0,!=3.1.0``
``apache-airflow-providers-common-compat`` ``>=1.10.0``
``pydantic`` ``>=2.11.0``
``retryhttp`` ``>=1.2.0,!=1.3.0``
diff --git a/providers/edge3/docs/architecture.rst b/providers/edge3/docs/architecture.rst
index 5c38b1ab95d07..5a1f7bb070afd 100644
--- a/providers/edge3/docs/architecture.rst
+++ b/providers/edge3/docs/architecture.rst
@@ -63,7 +63,7 @@ deployed outside of the central Airflow cluster is connected via HTTP(s) to the
* **Workers** (Central) - Execute the assigned tasks - most standard setup has local or centralized workers, e.g. via Celery
* **Edge Workers** - Special workers which pull tasks via HTTP(s) as provided as feature via this provider package
* **Scheduler** - Responsible for adding the necessary tasks to the queue. The EdgeExecutor is running as a module inside the scheduler.
-* **API server** (webserver in Airflow 2.x) - HTTP REST API Server provides access to Dag/task status information. The required end-points are
+* **API server** - HTTP REST API Server provides access to Dag/task status information. The required end-points are
provided by the Edge provider plugin. The Edge Worker uses this API to pull tasks and send back the results.
* **Database** - Contains information about the status of tasks, Dags, Variables, connections, etc.
@@ -77,7 +77,7 @@ In detail the parts of the Edge provider are deployed as follows:
need to set the ``executor`` configuration option in the ``airflow.cfg`` file to
``airflow.providers.edge3.executors.EdgeExecutor``. For more details see :doc:`edge_executor`. Note that also
multiple executors can be used in parallel together with the EdgeExecutor.
-* **API server** (webserver in Airflow 2.x) - The API server is providing REST endpoints to the web UI as well
+* **API server** - The API server is providing REST endpoints to the web UI as well
as serves static files. The Edge provider adds a plugin that provides additional REST API for the Edge Worker
as well as UI elements to manage workers (not available in Airflow 3.0).
The API server is responsible for handling requests from the Edge Worker and sending back the results. To
diff --git a/providers/edge3/docs/changelog.rst b/providers/edge3/docs/changelog.rst
index 24c3883836fc7..00b86c138bf9e 100644
--- a/providers/edge3/docs/changelog.rst
+++ b/providers/edge3/docs/changelog.rst
@@ -27,6 +27,14 @@
Changelog
---------
+.. warning::
+ This release of the Edge3 provider drops support for Airflow versions below 3.0.0.
+
+ The support for Airflow 2.10-2.11 was experimental and GA for the provider is only for Airflow 3.0+.
+ Productive operation was not intended in Airflow 2.x, therefore the support for Airflow 2.x is now dropped
+ earlier than the usual release support policy would indicate.
+
+
1.6.0
.....
diff --git a/providers/edge3/docs/deployment.rst b/providers/edge3/docs/deployment.rst
index d0387c41946ed..ff8a93def1322 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -113,7 +113,7 @@ process as and wait until all running tasks are completed. Also in a console you
If you want to monitor the remote activity and worker, use the UI plugin which
is included in the provider package and install it on the api-server / webserver and use the
"Admin" - "Edge Worker Hosts" and "Edge Worker Jobs" pages.
-(Note: The plugin is not available on Airflow 3.0 UI, it is only in 2.x and 3.1++)
+(Note: The plugin is not available on Airflow 3.0 UI, it is only in 3.1++)
If you want to check status of the worker via CLI you can use the command
@@ -150,7 +150,7 @@ Worker status can be checked via the web UI in the "Admin" - "Edge Worker" page.
.. note::
- Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 2.10 and in 3.1 and newer.
+ Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 3.1 and newer.
Alternatively you can use the CLI commands as described in :ref:`deployment:maintenance-mgmt-cli`.
diff --git a/providers/edge3/docs/index.rst b/providers/edge3/docs/index.rst
index 67f4b497cc452..94718902f0482 100644
--- a/providers/edge3/docs/index.rst
+++ b/providers/edge3/docs/index.rst
@@ -108,12 +108,12 @@ For the minimum Airflow version supported, see ``Requirements`` below.
Requirements
------------
-The minimum Apache Airflow version supported by this provider distribution is ``2.11.0``.
+The minimum Apache Airflow version supported by this provider distribution is ``3.0.0``.
========================================== ====================
PIP package Version required
========================================== ====================
-``apache-airflow`` ``>=2.11.0,!=3.1.0``
+``apache-airflow`` ``>=3.0.0,!=3.1.0``
``apache-airflow-providers-common-compat`` ``>=1.10.0``
``pydantic`` ``>=2.11.0``
``retryhttp`` ``>=1.2.0,!=1.3.0``
diff --git a/providers/edge3/docs/ui_plugin.rst b/providers/edge3/docs/ui_plugin.rst
index 043892e1ffc26..bbf621e4599a8 100644
--- a/providers/edge3/docs/ui_plugin.rst
+++ b/providers/edge3/docs/ui_plugin.rst
@@ -22,7 +22,7 @@ The Edge provider uses a Plugin to
- Extend the REST API endpoints for connecting workers to the Airflow cluster
- Provide a web UI for managing the workers and monitoring their status and tasks
- (Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is only available in Airflow 2.x and in 3.1 and newer.)
+ (Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is only available in Airflow 3.1 and newer.)
REST API endpoints
------------------
@@ -35,14 +35,14 @@ The Edge provider adds the following REST API endpoints to the Airflow API:
- ``/edge_worker/v1/health``: Check that the API endpoint is deployed and active
To see full documentation of the API endpoints open the Airflow web UI and navigate to
-the sub-path ``/edge_worker/docs`` (Airflow 3.0) or ``/edge_worker/v1/ui`` (Airflow 2.x).
+the sub-path ``/edge_worker/docs``.
Web UI Plugin
-------------
.. note::
- Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 2.x and in 3.1 and newer.
+ Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 3.1 and newer.
Alternatively you can use the CLI commands as described in :ref:`deployment:maintenance-mgmt-cli`.
The Edge provider adds a web UI plugin to the Airflow web UI. The plugin is
diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml
index 41fd6d063c6ed..077449e9a82a3 100644
--- a/providers/edge3/pyproject.toml
+++ b/providers/edge3/pyproject.toml
@@ -58,7 +58,7 @@ requires-python = ">=3.10"
# Make sure to run ``prek update-providers-dependencies --all-files``
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
- "apache-airflow>=2.11.0,!=3.1.0",
+ "apache-airflow>=3.0.0,!=3.1.0",
"apache-airflow-providers-common-compat>=1.10.0", # use next version
"pydantic>=2.11.0",
"retryhttp>=1.2.0,!=1.3.0",
diff --git a/providers/edge3/src/airflow/providers/edge3/__init__.py b/providers/edge3/src/airflow/providers/edge3/__init__.py
index 0ec10be5e5151..a1d4c24f2d4b4 100644
--- a/providers/edge3/src/airflow/providers/edge3/__init__.py
+++ b/providers/edge3/src/airflow/providers/edge3/__init__.py
@@ -32,8 +32,8 @@
__version__ = "1.6.0"
if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
- "2.11.0"
+ "3.0.0"
):
raise RuntimeError(
- f"The package `apache-airflow-providers-edge3:{__version__}` needs Apache Airflow 2.11.0+"
+ f"The package `apache-airflow-providers-edge3:{__version__}` needs Apache Airflow 3.0.0+"
)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
index ac08b1c50ebca..e0b0f7e8e199e 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/api_client.py
@@ -20,6 +20,7 @@
import logging
import os
from datetime import datetime
+from functools import cache
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -29,12 +30,12 @@
from retryhttp import retry, wait_retry_after
from tenacity import before_sleep_log, wait_random_exponential
+from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.configuration import conf
from airflow.providers.edge3.models.edge_worker import (
EdgeWorkerDuplicateException,
EdgeWorkerVersionException,
)
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.edge3.worker_api.datamodels import (
EdgeJobFetched,
PushLogsBody,
@@ -74,6 +75,15 @@
_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)
+@cache
+def jwt_generator() -> JWTGenerator:
+ return JWTGenerator(
+ secret_key=conf.get("api_auth", "jwt_secret"),
+ valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
+ audience="api",
+ )
+
+
@retry(
reraise=True,
max_attempt_number=API_RETRIES,
@@ -84,28 +94,7 @@
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
- if AIRFLOW_V_3_0_PLUS:
- from functools import cache
-
- from airflow.api_fastapi.auth.tokens import JWTGenerator
-
- @cache
- def jwt_generator() -> JWTGenerator:
- return JWTGenerator(
- secret_key=conf.get("api_auth", "jwt_secret"),
- valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
- audience="api",
- )
-
- generator = jwt_generator()
- authorization = generator.generate({"method": rest_path})
- else:
- # Airflow 2.10 compatibility
- from airflow.providers.edge3.worker_api.auth import jwt_signer
-
- signer = jwt_signer()
- authorization = signer.generate_signed_token({"method": rest_path})
-
+ authorization = jwt_generator().generate({"method": rest_path})
api_url = conf.get("edge", "api_url")
headers = {
"Content-Type": "application/json",
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 739dfd3bd69a6..f7707922640ac 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -25,7 +25,6 @@
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
-from subprocess import Popen
from time import sleep
from typing import TYPE_CHECKING
@@ -39,7 +38,6 @@
from airflow.providers.edge3.cli.api_client import (
jobs_fetch,
jobs_set_state,
- logs_logfile_path,
logs_push,
worker_register,
worker_set_state,
@@ -56,7 +54,6 @@
EdgeWorkerState,
EdgeWorkerVersionException,
)
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.net import getfqdn
from airflow.utils.state import TaskInstanceState
@@ -218,7 +215,7 @@ def _run_job_via_supervisor(workload, execution_api_server_url) -> int:
return 1
@staticmethod
- def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
+ def _launch_job(edge_job: EdgeJobFetched):
if TYPE_CHECKING:
from airflow.executors.workloads import ExecuteTask
@@ -232,29 +229,6 @@ def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in here
logfile = Path(base_log_folder, workload.log_path)
- return process, logfile
-
- @staticmethod
- def _launch_job_af2_10(edge_job: EdgeJobFetched) -> tuple[Popen, Path]:
- """Compatibility for Airflow 2.10 Launch."""
- env = os.environ.copy()
- env["AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION"] = "True"
- env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge", "api_url")
- env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
- command: list[str] = edge_job.command # type: ignore[assignment]
- process = Popen(command, close_fds=True, env=env, start_new_session=True)
- logfile = logs_logfile_path(edge_job.key)
- return process, logfile
-
- @staticmethod
- def _launch_job(edge_job: EdgeJobFetched):
- """Get the received job executed."""
- process: Popen | Process
- if AIRFLOW_V_3_0_PLUS:
- process, logfile = EdgeWorker._launch_job_af3(edge_job)
- else:
- # Airflow 2.10
- process, logfile = EdgeWorker._launch_job_af2_10(edge_job)
EdgeWorker.jobs.append(Job(edge_job, process, logfile, 0))
def start(self):
diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index dc2821831cb1d..f918e4b803559 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -29,6 +29,7 @@
from airflow.cli.cli_config import GroupCommand
from airflow.configuration import conf
+from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.providers.common.compat.sdk import Stats, timezone
@@ -36,7 +37,6 @@
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.db import DBLocks, create_global_lock
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
@@ -68,8 +68,10 @@ def _check_db_schema(self, engine: Engine) -> None:
"""
Check if already existing table matches the newest table schema.
- workaround till support for Airflow 2.x is dropped,
+ workaround as Airflow 2.x had no support for provider DB migrations,
then it is possible to use alembic also for provider distributions.
+
+ TODO(jscheffl): Change to alembic DB migrations in the future.
"""
inspector = inspect(engine)
edge_job_columns = None
@@ -124,66 +126,13 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
self.edge_queued_tasks = deepcopy(self.queued_tasks)
super()._process_tasks(task_tuples) # type: ignore[misc]
- @provide_session
- def execute_async(
- self,
- key: TaskInstanceKey,
- command: CommandType,
- queue: str | None = None,
- executor_config: Any | None = None,
- session: Session = NEW_SESSION,
- ) -> None:
- """Execute asynchronously. Airflow 2.10 entry point to execute a task."""
- # Use of a temporary trick to get task instance, will be changed with Airflow 3.0.0
- # code works together with _process_tasks overwrite to get task instance.
- # TaskInstance in fourth element
- task_instance = self.edge_queued_tasks[key][3] # type: ignore[index]
- del self.edge_queued_tasks[key]
-
- self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined]
-
- # Check if job already exists with same dag_id, task_id, run_id, map_index, try_number
- existing_job = (
- session.query(EdgeJobModel)
- .filter_by(
- dag_id=key.dag_id,
- task_id=key.task_id,
- run_id=key.run_id,
- map_index=key.map_index,
- try_number=key.try_number,
- )
- .first()
- )
-
- if existing_job:
- existing_job.state = TaskInstanceState.QUEUED
- existing_job.queue = queue or DEFAULT_QUEUE
- existing_job.concurrency_slots = task_instance.pool_slots
- existing_job.command = str(command)
- else:
- session.add(
- EdgeJobModel(
- dag_id=key.dag_id,
- task_id=key.task_id,
- run_id=key.run_id,
- map_index=key.map_index,
- try_number=key.try_number,
- state=TaskInstanceState.QUEUED,
- queue=queue or DEFAULT_QUEUE,
- concurrency_slots=task_instance.pool_slots,
- command=str(command),
- )
- )
-
@provide_session
def queue_workload(
self,
- workload: Any, # Note actually "airflow.executors.workloads.All" but not existing in Airflow 2.10
+ workload: workloads.All,
session: Session = NEW_SESSION,
) -> None:
"""Put new workload to queue. Airflow 3 entry point to execute a task."""
- from airflow.executors import workloads
-
if not isinstance(workload, workloads.ExecuteTask):
raise TypeError(f"Don't know how to queue workload of type {type(workload).__name__}")
@@ -262,11 +211,7 @@ def _check_worker_liveness(self, session: Session) -> bool:
def _update_orphaned_jobs(self, session: Session) -> bool:
"""Update status ob jobs when workers die and don't update anymore."""
- if AIRFLOW_V_3_0_PLUS:
- heartbeat_interval_config_name = "task_instance_heartbeat_timeout"
- else:
- heartbeat_interval_config_name = "scheduler_zombie_task_threshold"
- heartbeat_interval: int = conf.getint("scheduler", heartbeat_interval_config_name)
+ heartbeat_interval: int = conf.getint("scheduler", "task_instance_heartbeat_timeout")
lifeless_jobs: list[EdgeJobModel] = (
session.query(EdgeJobModel)
.with_for_update(skip_locked=True)
diff --git a/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py b/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py
deleted file mode 100644
index 49b78ecf1d824..0000000000000
--- a/providers/edge3/src/airflow/providers/edge3/openapi/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""OpenAPI Specs for Connexion API in Airflow 2.10.x."""
-
-# Note: This module folder is to be removed once Airflow 2.10.x support is removed.
diff --git a/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml b/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml
deleted file mode 100644
index 1bbadd20cd0d3..0000000000000
--- a/providers/edge3/src/airflow/providers/edge3/openapi/edge_worker_api_v1.yaml
+++ /dev/null
@@ -1,808 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
----
-openapi: 3.0.2
-info:
- title: Airflow Edge Worker API
- version: 1.0.0
- description: |
- This is Airflow Edge Worker API - which is a the access endpoint for workers
- running on remote sites serving for Apache Airflow jobs. It also proxies internal API
- to edge endpoints.
-
- It is not intended to be used by any external code.
-
- You can find more information in AIP-69
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=301795932
-
-
-servers:
- - url: /edge_worker/v1
- description: Airflow Edge Worker API
-paths:
- /worker/{worker_name}:
- patch:
- description: Set state of worker and returns the current assigned queues.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: set_state_v2
- parameters:
- - description: Hostname or instance name of the worker
- in: path
- name: worker_name
- required: true
- schema:
- description: Hostname or instance name of the worker
- title: Worker Name
- type: string
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/WorkerStateBody'
- description: State of the worker with details
- examples:
- - jobs_active: 3
- queues:
- - large_node
- - wisconsin_site
- state: running
- sysinfo:
- airflow_version: 2.10.0
- concurrency: 4
- edge_provider_version: 1.0.0
- title: Worker State
- required: true
- responses:
- '200':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/WorkerSetStateReturn'
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: Set State
- tags:
- - Worker
- post:
- description: Register a new worker to the backend.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: register_v2
- parameters:
- - description: Hostname or instance name of the worker
- in: path
- name: worker_name
- required: true
- schema:
- description: Hostname or instance name of the worker
- title: Worker Name
- type: string
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/WorkerStateBody'
- description: State of the worker with details
- examples:
- - jobs_active: 3
- queues:
- - large_node
- - wisconsin_site
- state: running
- sysinfo:
- airflow_version: 2.10.0
- concurrency: 4
- edge_provider_version: 1.0.0
- title: Worker State
- required: true
- responses:
- '200':
- content:
- application/json:
- schema:
- $ref: "#/components/schemas/WorkerRegistrationReturn"
- description: Registration response with the last update time of the worker.
- examples:
- - last_update: "2025-04-04T13:59:58.773870"
- title: Worker Registration
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: Register
- tags:
- - Worker
- /jobs/fetch/{worker_name}:
- post:
- description: Fetch a job to execute on the edge worker.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: job_fetch_v2
- parameters:
- - in: path
- name: worker_name
- required: true
- schema:
- title: Worker Name
- type: string
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/WorkerQueuesBody'
- description: The worker remote has no access to log sink and with this
- can send log chunks to the central site.
- title: Log data chunks
- required: true
- responses:
- '200':
- content:
- application/json:
- schema:
- anyOf:
- - $ref: '#/components/schemas/EdgeJobFetched'
- - type: object
- nullable: true
- title: Response Fetch
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: Fetch
- tags:
- - Jobs
- /jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}:
- patch:
- description: Update the state of a job running on the edge worker.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: job_state_v2
- parameters:
- - description: Identifier of the DAG to which the task belongs.
- in: path
- name: dag_id
- required: true
- schema:
- description: Identifier of the DAG to which the task belongs.
- title: Dag ID
- type: string
- - description: Task name in the DAG.
- in: path
- name: task_id
- required: true
- schema:
- description: Task name in the DAG.
- title: Task ID
- type: string
- - description: Run ID of the DAG execution.
- in: path
- name: run_id
- required: true
- schema:
- description: Run ID of the DAG execution.
- title: Run ID
- type: string
- - description: The number of attempt to execute this task.
- in: path
- name: try_number
- required: true
- schema:
- description: The number of attempt to execute this task.
- title: Try Number
- type: integer
- - description: For dynamically mapped tasks the mapping number, -1 if the task
- is not mapped.
- in: path
- name: map_index
- required: true
- schema:
- description: For dynamically mapped tasks the mapping number, -1 if the
- task is not mapped.
- title: Map Index
- type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
- - description: State of the assigned task under execution.
- in: path
- name: state
- required: true
- schema:
- $ref: '#/components/schemas/TaskInstanceState'
- description: State of the assigned task under execution.
- title: Task State
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- responses:
- '200':
- content:
- application/json:
- schema:
- title: Response State
- type: object
- nullable: true
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: State
- tags:
- - Jobs
- /logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
- get:
- description: Elaborate the path and filename to expect from task execution.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: logfile_path_v2
- parameters:
- - description: Identifier of the DAG to which the task belongs.
- in: path
- name: dag_id
- required: true
- schema:
- description: Identifier of the DAG to which the task belongs.
- title: Dag ID
- type: string
- - description: Task name in the DAG.
- in: path
- name: task_id
- required: true
- schema:
- description: Task name in the DAG.
- title: Task ID
- type: string
- - description: Run ID of the DAG execution.
- in: path
- name: run_id
- required: true
- schema:
- description: Run ID of the DAG execution.
- title: Run ID
- type: string
- - description: The number of attempt to execute this task.
- in: path
- name: try_number
- required: true
- schema:
- description: The number of attempt to execute this task.
- title: Try Number
- type: integer
- - description: For dynamically mapped tasks the mapping number, -1 if the task
- is not mapped.
- in: path
- name: map_index
- required: true
- schema:
- description: For dynamically mapped tasks the mapping number, -1 if the
- task is not mapped.
- title: Map Index
- type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- responses:
- '200':
- content:
- application/json:
- schema:
- title: Response Logfile Path
- type: string
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: Logfile Path
- tags:
- - Logs
- /logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
- post:
- description: Push an incremental log chunk from Edge Worker to central site.
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: push_logs_v2
- parameters:
- - description: Identifier of the DAG to which the task belongs.
- in: path
- name: dag_id
- required: true
- schema:
- description: Identifier of the DAG to which the task belongs.
- title: Dag ID
- type: string
- - description: Task name in the DAG.
- in: path
- name: task_id
- required: true
- schema:
- description: Task name in the DAG.
- title: Task ID
- type: string
- - description: Run ID of the DAG execution.
- in: path
- name: run_id
- required: true
- schema:
- description: Run ID of the DAG execution.
- title: Run ID
- type: string
- - description: The number of attempt to execute this task.
- in: path
- name: try_number
- required: true
- schema:
- description: The number of attempt to execute this task.
- title: Try Number
- type: integer
- - description: For dynamically mapped tasks the mapping number, -1 if the task
- is not mapped.
- in: path
- name: map_index
- required: true
- schema:
- description: For dynamically mapped tasks the mapping number, -1 if the
- task is not mapped.
- title: Map Index
- type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
- - description: JWT Authorization Token
- in: header
- name: authorization
- required: true
- schema:
- description: JWT Authorization Token
- title: Authorization
- type: string
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/PushLogsBody'
- description: The worker remote has no access to log sink and with this
- can send log chunks to the central site.
- title: Log data chunks
- required: true
- responses:
- '200':
- content:
- application/json:
- schema:
- title: Response Push Logs
- type: object
- nullable: true
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '422':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- description: Validation Error
- summary: Push Logs
- tags:
- - Logs
- /rpcapi:
- post:
- deprecated: false
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes._v2_routes
- operationId: rpcapi_v2
- tags:
- - JSONRPC
- parameters: []
- responses:
- '200':
- description: Successful response
- requestBody:
- x-body-name: body
- required: true
- content:
- application/json:
- schema:
- type: object
- required:
- - method
- - jsonrpc
- - params
- properties:
- jsonrpc:
- type: string
- default: '2.0'
- description: JSON-RPC Version (2.0)
- method:
- type: string
- description: Method name
- params:
- title: Parameters
- type: object
- /health:
- get:
- operationId: health
- deprecated: false
- x-openapi-router-controller: airflow.providers.edge3.worker_api.routes.health
- tags:
- - JSONRPC
- parameters: []
- responses:
- '200':
- description: Successful response
-x-headers: []
-x-explorer-enabled: true
-x-proxy-enabled: true
-components:
- schemas:
- JsonRpcRequired:
- type: object
- required:
- - method
- - jsonrpc
- properties:
- method:
- type: string
- description: Method name
- jsonrpc:
- type: string
- default: '2.0'
- description: JSON-RPC Version (2.0)
- discriminator:
- propertyName: method_name
- EdgeWorkerState:
- description: Status of a Edge Worker instance.
- enum:
- - starting
- - running
- - idle
- - terminating
- - offline
- - unknown
- - maintenance mode
- - maintenance request
- - maintenance pending
- - maintenance exit
- - offline maintenance
- title: EdgeWorkerState
- type: string
- WorkerStateBody:
- description: Details of the worker state sent to the scheduler.
- type: object
- required:
- - state
- - queues
- - sysinfo
- properties:
- jobs_active:
- default: 0
- description: Number of active jobs the worker is running.
- title: Jobs Active
- type: integer
- queues:
- anyOf:
- - items:
- type: string
- type: array
- - type: object
- nullable: true
- description: List of queues the worker is pulling jobs from. If not provided,
- worker pulls from all queues.
- title: Queues
- state:
- $ref: '#/components/schemas/EdgeWorkerState'
- description: State of the worker from the view of the worker.
- sysinfo:
- description: System information of the worker.
- title: Sysinfo
- type: object
- maintenance_comments:
- description: Comments about the maintenance state of the worker.
- title: Maintenance Comments
- anyOf:
- - type: string
- - type: object
- nullable: true
- title: WorkerStateBody
- WorkerQueuesBody:
- description: Queues that a worker supports to run jobs on.
- properties:
- queues:
- anyOf:
- - items:
- type: string
- type: array
- - type: object
- nullable: true
- description: List of queues the worker is pulling jobs from. If not provided,
- worker pulls from all queues.
- title: Queues
- free_concurrency:
- description: Number of free slots for running tasks.
- title: Free Concurrency
- type: integer
- required:
- - queues
- - free_concurrency
- title: WorkerQueuesBody
- type: object
- WorkerRegistrationReturn:
- description: The response of the worker registration.
- properties:
- last_update:
- description: Time of the last update of the worker.
- format: date-time
- title: Last Update
- type: string
- title: WorkerRegistrationReturn
- WorkerSetStateReturn:
- description: The state written in the database
- properties:
- queues:
- anyOf:
- - items:
- type: string
- type: array
- - type: object
- nullable: true
- description: List of queues the worker is pulling jobs from. If not provided,
- worker pulls from all queues.
- title: Queues
- state:
- $ref: '#/components/schemas/EdgeWorkerState'
- description: State of the worker from the view of the worker.
- maintenance_comments:
- description: Comments about the maintenance state of the worker.
- title: Maintenance Comments
- anyOf:
- - type: string
- - type: object
- nullable: true
- title: WorkerSetStateReturn
- EdgeJobFetched:
- description: Job that is to be executed on the edge worker.
- properties:
- command:
- description: Command line to use to execute the job.
- items:
- type: string
- title: Command
- type: array
- concurrency_slots:
- description: Number of slots to use for the task.
- title: Concurrency Slots
- type: integer
- dag_id:
- description: Identifier of the DAG to which the task belongs.
- title: Dag ID
- type: string
- map_index:
- description: For dynamically mapped tasks the mapping number, -1 if the
- task is not mapped.
- title: Map Index
- type: integer
- run_id:
- description: Run ID of the DAG execution.
- title: Run ID
- type: string
- task_id:
- description: Task name in the DAG.
- title: Task ID
- type: string
- try_number:
- description: The number of attempt to execute this task.
- title: Try Number
- type: integer
- required:
- - dag_id
- - task_id
- - run_id
- - map_index
- - try_number
- - command
- title: EdgeJobFetched
- type: object
- TaskInstanceState:
- description: 'All possible states that a Task Instance can be in.
-
-
- Note that None is also allowed, so always use this in a type hint with Optional.'
- enum:
- - removed
- - scheduled
- - queued
- - running
- - success
- - restarting
- - failed
- - up_for_retry
- - up_for_reschedule
- - upstream_failed
- - skipped
- - deferred
- title: TaskInstanceState
- type: string
- PushLogsBody:
- description: Incremental new log content from worker.
- properties:
- log_chunk_data:
- description: Log chunk data as incremental log text.
- title: Log Chunk Data
- type: string
- log_chunk_time:
- description: Time of the log chunk at point of sending.
- format: date-time
- title: Log Chunk Time
- type: string
- required:
- - log_chunk_time
- - log_chunk_data
- title: PushLogsBody
- type: object
- HTTPExceptionResponse:
- description: HTTPException Model used for error response.
- properties:
- detail:
- anyOf:
- - type: string
- - type: object
- title: Detail
- required:
- - detail
- title: HTTPExceptionResponse
- type: object
- HTTPValidationError:
- properties:
- detail:
- items:
- $ref: '#/components/schemas/ValidationError'
- title: Detail
- type: array
- title: HTTPValidationError
- type: object
- ValidationError:
- properties:
- loc:
- items:
- anyOf:
- - type: string
- - type: integer
- title: Location
- type: array
- msg:
- title: Message
- type: string
- type:
- title: Error Type
- type: string
- required:
- - loc
- - msg
- - type
- title: ValidationError
- type: object
-
-tags: []
diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
index 110e9efb7fcd4..b8aa49a642ff4 100644
--- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
+++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
@@ -23,188 +23,35 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.plugins_manager import AirflowPlugin
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
from airflow.utils.session import NEW_SESSION, provide_session
if TYPE_CHECKING:
from sqlalchemy.orm import Session
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.db import DBLocks, create_global_lock
+from airflow.utils.db import DBLocks, create_global_lock
- @provide_session
- def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
- # Ensure all required DB modeals are created before starting the API
- with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
- engine = session.get_bind().engine
- from airflow.providers.edge3.models.edge_job import EdgeJobModel
- from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
- from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
- EdgeJobModel.metadata.create_all(engine)
- EdgeLogsModel.metadata.create_all(engine)
- EdgeWorkerModel.metadata.create_all(engine)
+@provide_session
+def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
+ # Ensure all required DB modeals are created before starting the API
+ with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+ engine = session.get_bind().engine
+ from airflow.providers.edge3.models.edge_job import EdgeJobModel
+ from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
+ from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
- from airflow.providers.edge3.worker_api.app import create_edge_worker_api_app
+ EdgeJobModel.metadata.create_all(engine)
+ EdgeLogsModel.metadata.create_all(engine)
+ EdgeWorkerModel.metadata.create_all(engine)
- return {
- "app": create_edge_worker_api_app(),
- "url_prefix": "/edge_worker",
- "name": "Airflow Edge Worker",
- }
+ from airflow.providers.edge3.worker_api.app import create_edge_worker_api_app
-else:
- # This is for back-compatibility with Airflow 2.x and we only make this
- # to prevents dependencies and breaking imports in Airflow 3.x
- import re
- from datetime import datetime, timedelta
- from pathlib import Path
-
- from flask import Blueprint, redirect, request, url_for
- from flask_appbuilder import BaseView, expose
- from markupsafe import Markup
- from sqlalchemy import select
-
- from airflow.auth.managers.models.resource_details import AccessView
- from airflow.utils.state import State, TaskInstanceState
- from airflow.utils.yaml import safe_load
- from airflow.www.auth import has_access_view
-
- def _get_airflow_2_api_endpoint() -> Blueprint:
- from airflow.www.app import csrf
- from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
- from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver
-
- folder = Path(__file__).parents[1].resolve() # this is airflow/providers/edge3/
- with folder.joinpath("openapi", "edge_worker_api_v1.yaml").open() as f:
- specification = safe_load(f)
- from connexion import FlaskApi
-
- bp = FlaskApi(
- specification=specification,
- resolver=_LazyResolver(),
- base_path="/edge_worker/v1",
- strict_validation=True,
- options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
- validate_responses=True,
- validator_map={"body": _CustomErrorRequestBodyValidator},
- ).blueprint
- # Need to exempt CSRF to make API usable
- csrf.exempt(bp)
- return bp
-
- def _state_token(state):
- """Return a formatted string with HTML for a given State."""
- color = State.color(state)
- fg_color = State.color_fg(state)
- return Markup(
- """
- {state}
- """
- ).format(color=color, state=state, fg_color=fg_color)
-
- def modify_maintenance_comment_on_update(maintenance_comment: str | None, username: str) -> str:
- if maintenance_comment:
- if re.search(
- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*", maintenance_comment
- ):
- return re.sub(
- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:",
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
- maintenance_comment,
- )
- if re.search(r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*", maintenance_comment):
- return re.sub(
- r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:",
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:",
- maintenance_comment,
- )
- return f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment: {maintenance_comment}"
- return (
- f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {username} updated maintenance mode\nComment:"
- )
-
- # registers airflow/providers/edge3/plugins/templates as a Jinja template folder
- template_bp = Blueprint(
- "template_blueprint",
- __name__,
- template_folder="templates",
- )
-
- class EdgeWorkerJobs(BaseView):
- """Simple view to show Edge Worker jobs."""
-
- default_view = "jobs"
-
- @expose("/jobs")
- @has_access_view(AccessView.JOBS)
- @provide_session
- def jobs(self, session: Session = NEW_SESSION):
- from airflow.providers.edge3.models.edge_job import EdgeJobModel
-
- jobs = session.scalars(select(EdgeJobModel).order_by(EdgeJobModel.queued_dttm)).all()
- html_states = {
- str(state): _state_token(str(state)) for state in TaskInstanceState.__members__.values()
- }
- return self.render_template("edge_worker_jobs.html", jobs=jobs, html_states=html_states)
-
- class EdgeWorkerHosts(BaseView):
- """Simple view to show Edge Worker status."""
-
- default_view = "status"
-
- @expose("/status")
- @has_access_view(AccessView.JOBS)
- @provide_session
- def status(self, session: Session = NEW_SESSION):
- from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel
-
- hosts = session.scalars(select(EdgeWorkerModel).order_by(EdgeWorkerModel.worker_name)).all()
- five_min_ago = datetime.now() - timedelta(minutes=5)
- return self.render_template("edge_worker_hosts.html", hosts=hosts, five_min_ago=five_min_ago)
-
- @expose("/status/maintenance//on", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def worker_to_maintenance(self, worker_name: str):
- from flask_login import current_user
-
- from airflow.providers.edge3.models.edge_worker import request_maintenance
-
- maintenance_comment = request.form.get("maintenance_comment")
- maintenance_comment = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] - {current_user.username} put node into maintenance mode\nComment: {maintenance_comment}"
- request_maintenance(worker_name, maintenance_comment)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//off", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def remove_worker_from_maintenance(self, worker_name: str):
- from airflow.providers.edge3.models.edge_worker import exit_maintenance
-
- exit_maintenance(worker_name)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//remove", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def remove_worker(self, worker_name: str):
- from airflow.providers.edge3.models.edge_worker import remove_worker
-
- remove_worker(worker_name)
- return redirect(url_for("EdgeWorkerHosts.status"))
-
- @expose("/status/maintenance//change_comment", methods=["POST"])
- @has_access_view(AccessView.JOBS)
- def change_maintenance_comment(self, worker_name: str):
- from flask_login import current_user
-
- from airflow.providers.edge3.models.edge_worker import change_maintenance_comment
-
- maintenance_comment = request.form.get("maintenance_comment")
- maintenance_comment = modify_maintenance_comment_on_update(
- maintenance_comment, current_user.username
- )
- change_maintenance_comment(worker_name, maintenance_comment)
- return redirect(url_for("EdgeWorkerHosts.status"))
+ return {
+ "app": create_edge_worker_api_app(),
+ "url_prefix": "/edge_worker",
+ "name": "Airflow Edge Worker",
+ }
# Check if EdgeExecutor is actually loaded
@@ -213,17 +60,14 @@ def change_maintenance_comment(self, worker_name: str):
except AirflowConfigException:
EDGE_EXECUTOR_ACTIVE = False
-# Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 2.x)
-# todo(jscheffl): Remove this check when the discussion in
+# Load the API endpoint only on api-server
+# TODO(jscheffl): Remove this check when the discussion in
# https://lists.apache.org/thread/w170czq6r7bslkqp1tk6bjjjo0789wgl
# resulted in a proper API to selective initialize. Maybe backcompat-shim
# is also needed to support Airflow-versions prior the rework.
-if AIRFLOW_V_3_0_PLUS:
- RUNNING_ON_APISERVER = (len(sys.argv) > 1 and sys.argv[1] in ["api-server"]) or (
- len(sys.argv) > 2 and sys.argv[2] == "airflow-core/src/airflow/api_fastapi/main.py"
- )
-else:
- RUNNING_ON_APISERVER = "gunicorn" in sys.argv[0] and "airflow-webserver" in sys.argv
+RUNNING_ON_APISERVER = (len(sys.argv) > 1 and sys.argv[1] in ["api-server"]) or (
+ len(sys.argv) > 2 and sys.argv[2] == "airflow-core/src/airflow/api_fastapi/main.py"
+)
def _get_base_url_path(path: str) -> str:
@@ -247,8 +91,9 @@ class EdgeExecutorPlugin(AirflowPlugin):
name = "edge_executor"
if EDGE_EXECUTOR_ACTIVE and RUNNING_ON_APISERVER:
+ fastapi_apps = [_get_api_endpoint()]
if AIRFLOW_V_3_1_PLUS:
- fastapi_apps = [_get_api_endpoint()]
+ # Airflow 3.0 does not know about react_apps, so we only provide the API endpoint
react_apps = [
{
"name": "Edge Executor",
@@ -271,27 +116,3 @@ class EdgeExecutorPlugin(AirflowPlugin):
"url_route": "edge_worker_api_docs",
}
]
- if AIRFLOW_V_3_0_PLUS:
- # Airflow 3.0 does not know about react_apps, so we only provide the API endpoint
- fastapi_apps = [_get_api_endpoint()]
- else:
- appbuilder_menu_items = [
- {
- "name": "Edge Worker API docs",
- "href": _get_base_url_path("/edge_worker/v1/ui"),
- "category": "Docs",
- }
- ]
- appbuilder_views = [
- {
- "name": "Edge Worker Jobs",
- "category": "Admin",
- "view": EdgeWorkerJobs(),
- },
- {
- "name": "Edge Worker Hosts",
- "category": "Admin",
- "view": EdgeWorkerHosts(),
- },
- ]
- flask_blueprints = [_get_airflow_2_api_endpoint(), template_bp]
diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json b/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
index 14b306a2e911e..911d5353f5d41 100644
--- a/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
+++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/package.json
@@ -27,7 +27,7 @@
"lint:fix": "eslint --fix && tsc --p tsconfig.app.json",
"format": "pnpm prettier --write .",
"preview": "vite preview",
- "codegen": "openapi-rq -i ../../openapi/v2-edge-generated.yaml -c axios --format prettier -o openapi-gen --operationId",
+ "codegen": "openapi-rq -i ../../worker_api/v2-edge-generated.yaml -c axios --format prettier -o openapi-gen --operationId",
"test": "vitest run",
"coverage": "vitest run --coverage"
},
diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py b/providers/edge3/src/airflow/providers/edge3/version_compat.py
index 209e8b63f35dc..27070ab292bad 100644
--- a/providers/edge3/src/airflow/providers/edge3/version_compat.py
+++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py
@@ -32,10 +32,8 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
__all__ = [
- "AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
]
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
index 2188d8c76148f..a29fd42b2d1e7 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/auth.py
@@ -20,6 +20,7 @@
from functools import cache
from uuid import uuid4
+from fastapi import Header, HTTPException, Request, status
from itsdangerous import BadSignature
from jwt import (
ExpiredSignatureError,
@@ -29,49 +30,24 @@
InvalidSignatureError,
)
+from airflow.api_fastapi.auth.tokens import JWTValidator
from airflow.configuration import conf
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.edge3.worker_api.datamodels import JsonRpcRequestBase # noqa: TCH001
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
- Header,
- HTTPException,
- Request,
- status,
-)
log = logging.getLogger(__name__)
-if AIRFLOW_V_3_0_PLUS:
- from airflow.api_fastapi.auth.tokens import JWTValidator
-
- @cache
- def jwt_validator() -> JWTValidator:
- return JWTValidator(
- secret_key=conf.get("api_auth", "jwt_secret"),
- leeway=conf.getint("api_auth", "jwt_leeway", fallback=30),
- audience="api",
- )
+@cache
+def jwt_validator() -> JWTValidator:
+ return JWTValidator(
+ secret_key=conf.get("api_auth", "jwt_secret"),
+ leeway=conf.getint("api_auth", "jwt_leeway", fallback=30),
+ audience="api",
+ )
- def jwt_validate(authorization: str) -> dict:
- return jwt_validator().validated_claims(authorization)
-
-else:
- # Airflow 2.10 compatibility
- from airflow.utils.jwt_signer import JWTSigner
-
- @cache
- def jwt_signer() -> JWTSigner:
- clock_grace = conf.getint("core", "internal_api_clock_grace", fallback=30)
- return JWTSigner(
- secret_key=conf.get("core", "internal_api_secret_key"),
- expiration_time_in_seconds=clock_grace,
- leeway_in_seconds=clock_grace,
- audience="api",
- )
- def jwt_validate(authorization: str) -> dict:
- return jwt_signer().verify_token(authorization)
+def jwt_validate(authorization: str) -> dict:
+ return jwt_validator().validated_claims(authorization)
def _forbidden_response(message: str):
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
index 0d4530d86070b..bd7b60c8b1555 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels.py
@@ -22,11 +22,12 @@
Any,
)
+from fastapi import Path
from pydantic import BaseModel, Field
+from airflow.executors.workloads import ExecuteTask # noqa: TCH001
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.edge3.models.edge_worker import EdgeWorkerState # noqa: TCH001
-from airflow.providers.edge3.worker_api.routes._v2_compat import ExecuteTask, Path
class WorkerApiDocs:
@@ -93,7 +94,7 @@ class EdgeJobFetched(EdgeJobBase):
ExecuteTask,
Field(
title="Command",
- description="Command line to use to execute the job in Airflow 2. Task definition in Airflow 3",
+ description="Command line to use to execute the job in Airflow",
),
]
concurrency_slots: Annotated[int, Field(description="Number of concurrency slots the job requires.")]
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py
deleted file mode 100644
index ffe1cb092490f..0000000000000
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_compat.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Compatibility layer for API to provide both FastAPI as well as Connexion based endpoints."""
-
-from __future__ import annotations
-
-from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- # Just re-import the types from FastAPI and Airflow Core
- from fastapi import Body, Depends, Header, HTTPException, Path, Request, status
-
- from airflow.api_fastapi.common.db.common import SessionDep
- from airflow.api_fastapi.common.router import AirflowRouter
- from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
-
- # In Airflow 3 with AIP-72 we get workload addressed by ExecuteTask
- from airflow.executors.workloads import ExecuteTask
-
- def parse_command(command: str) -> ExecuteTask:
- return ExecuteTask.model_validate_json(command)
-else:
- # Mock the external dependencies
- from collections.abc import Callable
-
- from connexion import ProblemException
-
- class Body: # type: ignore[no-redef]
- def __init__(self, *_, **__):
- pass
-
- class Depends: # type: ignore[no-redef]
- def __init__(self, *_, **__):
- pass
-
- class Header: # type: ignore[no-redef]
- def __init__(self, *_, **__):
- pass
-
- class Path: # type: ignore[no-redef]
- def __init__(self, *_, **__):
- pass
-
- class Request: # type: ignore[no-redef]
- pass
-
- class SessionDep: # type: ignore[no-redef]
- pass
-
- def create_openapi_http_exception_doc(responses_status_code: list[int]) -> dict:
- return {}
-
- class status: # type: ignore[no-redef]
- HTTP_204_NO_CONTENT = 204
- HTTP_400_BAD_REQUEST = 400
- HTTP_403_FORBIDDEN = 403
- HTTP_404_NOT_FOUND = 404
- HTTP_409_CONFLICT = 409
- HTTP_500_INTERNAL_SERVER_ERROR = 500
-
- class HTTPException(ProblemException): # type: ignore[no-redef]
- """Raise when the user does not have the required permissions."""
-
- def __init__(
- self,
- status: int,
- detail: str,
- ) -> None:
- from airflow.utils.docs import get_docs_url
-
- doc_link = get_docs_url("stable-rest-api-ref.html")
- EXCEPTIONS_LINK_MAP = {
- 400: f"{doc_link}#section/Errors/BadRequest",
- 403: f"{doc_link}#section/Errors/PermissionDenied",
- 409: f"{doc_link}#section/Errors/Conflict",
- 500: f"{doc_link}#section/Errors/Unknown",
- }
- TITLE_MAP = {
- 400: "BadRequest",
- 403: "PermissionDenied",
- 409: "Conflict",
- 500: "InternalServerError",
- }
- super().__init__(
- status=status,
- type=EXCEPTIONS_LINK_MAP[status],
- title=TITLE_MAP[status],
- detail=detail,
- )
-
- @property
- def status_code(self) -> int:
- """Alias for status to match FastAPI's HTTPException interface."""
- return self.status
-
- def to_response(self):
- from flask import Response
-
- return Response(response=self.detail, status=self.status)
-
- class AirflowRouter: # type: ignore[no-redef]
- def __init__(self, *_, **__):
- pass
-
- def get(self, *_, **__):
- def decorator(func: Callable) -> Callable:
- return func
-
- return decorator
-
- def post(self, *_, **__):
- def decorator(func: Callable) -> Callable:
- return func
-
- return decorator
-
- def patch(self, *_, **__):
- def decorator(func: Callable) -> Callable:
- return func
-
- return decorator
-
- # In Airflow 3 with AIP-72 we get workload addressed by ExecuteTask
- # But in Airflow 2.10 it is a command line array
- ExecuteTask = list[str] # type: ignore[assignment,misc]
-
- def parse_command(command: str) -> ExecuteTask:
- from ast import literal_eval
-
- return literal_eval(command)
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py
deleted file mode 100644
index a275246d07880..0000000000000
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py
+++ /dev/null
@@ -1,237 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Compatibility layer for Connexion API to Airflow v2.10 API routes."""
-
-from __future__ import annotations
-
-import json
-import logging
-from typing import TYPE_CHECKING, Any
-from uuid import uuid4
-
-from flask import Response, request
-
-from airflow.exceptions import AirflowException
-from airflow.providers.edge3.worker_api.auth import (
- jwt_token_authorization,
- jwt_token_authorization_rpc,
-)
-from airflow.providers.edge3.worker_api.datamodels import (
- EdgeJobFetched,
- JsonRpcRequest,
- PushLogsBody,
- WorkerQueuesBody,
- WorkerStateBody,
-)
-from airflow.providers.edge3.worker_api.routes._v2_compat import HTTPException, status
-from airflow.providers.edge3.worker_api.routes.jobs import fetch, state as state_api
-from airflow.providers.edge3.worker_api.routes.logs import logfile_path, push_logs
-from airflow.providers.edge3.worker_api.routes.worker import register, set_state
-from airflow.serialization.serialized_objects import BaseSerialization
-from airflow.utils.session import NEW_SESSION, create_session, provide_session
-
-if TYPE_CHECKING:
- from airflow.api_connexion.types import APIResponse
- from airflow.utils.state import TaskInstanceState
-
-
-log = logging.getLogger(__name__)
-
-
-def error_response(message: str, status: int):
- """Log the error and return the response as JSON object."""
- error_id = uuid4()
- server_message = f"{message} error_id={error_id}"
- log.exception(server_message)
- client_message = f"{message} The server side traceback may be identified with error_id={error_id}"
- return HTTPException(status, client_message)
-
-
-def rpcapi_v2(body: dict[str, Any]) -> APIResponse:
- """Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint for Airflow 2.10."""
- # Note: Except the method map this _was_ a 100% copy of internal API module
- # airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
- # As of rework for FastAPI in Airflow 3.0, this is updated and to be removed in the future.
- from airflow.api_internal.endpoints.rpc_api_endpoint import (
- # Note: This is just for compatibility with Airflow 2.10, not working for Airflow 3 / main as removed
- initialize_method_map,
- )
-
- try:
- if request.headers.get("Content-Type", "") != "application/json":
- raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Content-Type: application/json")
- if request.headers.get("Accept", "") != "application/json":
- raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Accept: application/json")
- auth = request.headers.get("Authorization", "")
- request_obj = JsonRpcRequest(method=body["method"], jsonrpc=body["jsonrpc"], params=body["params"])
- jwt_token_authorization_rpc(request_obj, auth)
- if request_obj.jsonrpc != "2.0":
- raise error_response("Expected jsonrpc 2.0 request.", status.HTTP_400_BAD_REQUEST)
-
- log.debug("Got request for %s", request_obj.method)
- methods_map = initialize_method_map()
- if request_obj.method not in methods_map:
- raise error_response(f"Unrecognized method: {request_obj.method}.", status.HTTP_400_BAD_REQUEST)
-
- handler = methods_map[request_obj.method]
- params = {}
- try:
- if request_obj.params:
- # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
- params = BaseSerialization.deserialize(request_obj.params, use_pydantic_models=True) # type: ignore[call-arg]
- except Exception:
- raise error_response("Error deserializing parameters.", status.HTTP_400_BAD_REQUEST)
-
- log.debug("Calling method %s\nparams: %s", request_obj.method, params)
- try:
- # Session must be created there as it may be needed by serializer for lazy-loaded fields.
- with create_session() as session:
- output = handler(**params, session=session)
- # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
- output_json = BaseSerialization.serialize(output, use_pydantic_models=True) # type: ignore[call-arg]
- log.debug(
- "Sending response: %s", json.dumps(output_json) if output_json is not None else None
- )
- # In case of AirflowException or other selective known types, transport the exception class back to caller
- except (KeyError, AttributeError, AirflowException) as e:
- # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
- output_json = BaseSerialization.serialize(e, use_pydantic_models=True) # type: ignore[call-arg]
- log.debug(
- "Sending exception response: %s", json.dumps(output_json) if output_json is not None else None
- )
- except Exception:
- raise error_response(
- f"Error executing method '{request_obj.method}'.", status.HTTP_500_INTERNAL_SERVER_ERROR
- )
- response = json.dumps(output_json) if output_json is not None else None
- return Response(response=response, headers={"Content-Type": "application/json"})
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-def jwt_token_authorization_v2(method: str, authorization: str):
- """Proxy for v2 method path handling."""
- PREFIX = "/edge_worker/v1/"
- method_path = method[method.find(PREFIX) + len(PREFIX) :] if PREFIX in method else method
- jwt_token_authorization(method_path, authorization)
-
-
-@provide_session
-def register_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> Any:
- """Handle Edge Worker API `/edge_worker/v1/worker/{worker_name}` endpoint for Airflow 2.10."""
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- request_obj = WorkerStateBody(
- state=body["state"], jobs_active=0, queues=body["queues"], sysinfo=body["sysinfo"]
- )
- return register(worker_name, request_obj, session).model_dump()
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-@provide_session
-def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> Any:
- """Handle Edge Worker API `/edge_worker/v1/worker/{worker_name}` endpoint for Airflow 2.10."""
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- request_obj = WorkerStateBody(
- state=body["state"],
- jobs_active=body["jobs_active"],
- queues=body["queues"],
- sysinfo=body["sysinfo"],
- maintenance_comments=body.get("maintenance_comments"),
- )
- return set_state(worker_name, request_obj, session).model_dump()
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-@provide_session
-def job_fetch_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> Any:
- """Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` endpoint for Airflow 2.10."""
- from flask import request
-
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- queues = body.get("queues")
- free_concurrency = body.get("free_concurrency", 1)
- request_obj = WorkerQueuesBody(queues=queues, free_concurrency=free_concurrency)
- job: EdgeJobFetched | None = fetch(worker_name, request_obj, session)
- return job.model_dump() if job is not None else None
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-@provide_session
-def job_state_v2(
- dag_id: str,
- task_id: str,
- run_id: str,
- try_number: int,
- map_index: str, # Note: Connexion can not have negative numbers in path parameters, use string therefore
- state: TaskInstanceState,
- session=NEW_SESSION,
-) -> Any:
- """Handle Edge Worker API `/jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}` endpoint for Airflow 2.10."""
- from flask import request
-
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- state_api(dag_id, task_id, run_id, try_number, int(map_index), state, session)
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-def logfile_path_v2(
- dag_id: str,
- task_id: str,
- run_id: str,
- try_number: int,
- map_index: str, # Note: Connexion can not have negative numbers in path parameters, use string therefore
-) -> str:
- """Handle Edge Worker API `/edge_worker/v1/logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}` endpoint for Airflow 2.10."""
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- return logfile_path(dag_id, task_id, run_id, try_number, int(map_index))
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
-
-
-def push_logs_v2(
- dag_id: str,
- task_id: str,
- run_id: str,
- try_number: int,
- map_index: str, # Note: Connexion can not have negative numbers in path parameters, use string therefore
- body: dict[str, Any],
-) -> None:
- """Handle Edge Worker API `/edge_worker/v1/logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}` endpoint for Airflow 2.10."""
- try:
- auth = request.headers.get("Authorization", "")
- jwt_token_authorization_v2(request.path, auth)
- request_obj = PushLogsBody(
- log_chunk_data=body["log_chunk_data"], log_chunk_time=body["log_chunk_time"]
- )
- with create_session() as session:
- push_logs(dag_id, task_id, run_id, try_number, int(map_index), request_obj, session)
- except HTTPException as e:
- return e.to_response() # type: ignore[attr-defined]
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
index 557da25077f0f..1281ec2f8e05d 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/health.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from airflow.providers.edge3.worker_api.routes._v2_compat import AirflowRouter
+from airflow.api_fastapi.common.router import AirflowRouter
health_router = AirflowRouter(tags=["Monitor"])
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
index 9f0c282fb7025..c39e9dc1814e9 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
@@ -19,8 +19,13 @@
from typing import Annotated
+from fastapi import Body, Depends, status
from sqlalchemy import select, update
+from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
+from airflow.executors.workloads import ExecuteTask
from airflow.providers.common.compat.sdk import Stats, timezone
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
@@ -29,20 +34,15 @@
WorkerApiDocs,
WorkerQueuesBody,
)
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
- AirflowRouter,
- Body,
- Depends,
- SessionDep,
- create_openapi_http_exception_doc,
- parse_command,
- status,
-)
from airflow.utils.state import TaskInstanceState
jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+def parse_command(command: str) -> ExecuteTask:
+ return ExecuteTask.model_validate_json(command)
+
+
@jobs_router.post(
"/fetch/{worker_name}",
dependencies=[Depends(jwt_token_authorization_rest)],
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
index 900ae18b0125d..064808b2b0484 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
@@ -21,20 +21,17 @@
from pathlib import Path
from typing import TYPE_CHECKING, Annotated
+from fastapi import Body, Depends, status
+
+from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import PushLogsBody, WorkerApiDocs
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
- AirflowRouter,
- Body,
- Depends,
- SessionDep,
- create_openapi_http_exception_doc,
- status,
-)
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.session import NEW_SESSION, provide_session
diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 4e2298c8adb8b..07054e895b8ae 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -20,8 +20,12 @@
import json
from typing import Annotated
+from fastapi import Body, Depends, HTTPException, Path, status
from sqlalchemy import select
+from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.providers.common.compat.sdk import Stats, timezone
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
@@ -31,16 +35,6 @@
WorkerSetStateReturn,
WorkerStateBody,
)
-from airflow.providers.edge3.worker_api.routes._v2_compat import (
- AirflowRouter,
- Body,
- Depends,
- HTTPException,
- Path,
- SessionDep,
- create_openapi_http_exception_doc,
- status,
-)
worker_router = AirflowRouter(
tags=["Worker"],
diff --git a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
similarity index 99%
rename from providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
rename to providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
index 36e8a52641126..a6567af3bcb4a 100644
--- a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
@@ -898,8 +898,7 @@ components:
command:
$ref: '#/components/schemas/ExecuteTask'
title: Command
- description: Command line to use to execute the job in Airflow 2. Task definition
- in Airflow 3
+ description: Command line to use to execute the job in Airflow
concurrency_slots:
type: integer
title: Concurrency Slots
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 515421b529742..b8d7435a9855f 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -49,34 +49,29 @@
from airflow.utils.state import TaskInstanceState
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytest.importorskip("pydantic", minversion="2.0.0")
-MOCK_COMMAND = (
- {
- "token": "mock",
- "ti": {
- "id": "4d828a62-a417-4936-a7a6-2b3fabacecab",
- "task_id": "mock",
- "dag_id": "mock",
- "run_id": "mock",
- "try_number": 1,
- "dag_version_id": "01234567-89ab-cdef-0123-456789abcdef",
- "pool_slots": 1,
- "queue": "default",
- "priority_weight": 1,
- "start_date": "2023-01-01T00:00:00+00:00",
- "map_index": -1,
- },
- "dag_rel_path": "mock.py",
- "log_path": "mock.log",
- "bundle_info": {"name": "hello", "version": "abc"},
- }
- if AIRFLOW_V_3_0_PLUS
- else ["test", "command"] # Airflow 2.10
-)
+MOCK_COMMAND = {
+ "token": "mock",
+ "ti": {
+ "id": "4d828a62-a417-4936-a7a6-2b3fabacecab",
+ "task_id": "mock",
+ "dag_id": "mock",
+ "run_id": "mock",
+ "try_number": 1,
+ "dag_version_id": "01234567-89ab-cdef-0123-456789abcdef",
+ "pool_slots": 1,
+ "queue": "default",
+ "priority_weight": 1,
+ "start_date": "2023-01-01T00:00:00+00:00",
+ "map_index": -1,
+ },
+ "dag_rel_path": "mock.py",
+ "log_path": "mock.log",
+ "bundle_info": {"name": "hello", "version": "abc"},
+}
class _MockPopen(Popen):
@@ -141,10 +136,7 @@ def mock_edgeworker(self) -> EdgeWorkerModel:
return test_edgeworker
@patch("airflow.providers.edge3.cli.worker.Process")
- @patch("airflow.providers.edge3.cli.worker.logs_logfile_path")
- @patch("airflow.providers.edge3.cli.worker.Popen")
- def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_with_job: EdgeWorker):
- mock_popen.side_effect = [MagicMock()]
+ def test_launch_job(self, mock_process, worker_with_job: EdgeWorker):
mock_process_instance = MagicMock()
mock_process.side_effect = [mock_process_instance]
@@ -152,17 +144,12 @@ def test_launch_job(self, mock_popen, mock_logfile_path, mock_process, worker_wi
with conf_vars({("edge", "api_url"): "https://invalid-api-test-endpoint"}):
worker_with_job._launch_job(edge_job)
- if AIRFLOW_V_3_0_PLUS:
- assert mock_process.call_count == 1
- assert mock_process_instance.start.call_count == 1
- else:
- assert mock_popen.call_count == 1
- assert mock_logfile_path.call_count == 1
+ assert mock_process.call_count == 1
+ assert mock_process_instance.start.call_count == 1
assert len(EdgeWorker.jobs) == 1
assert EdgeWorker.jobs[0].edge_job == edge_job
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3+")
@pytest.mark.parametrize(
("configs", "expected_url"),
[
@@ -193,18 +180,14 @@ def test_execution_api_server_url(
url = EdgeWorker._execution_api_server_url()
assert url == expected_url
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3+")
@patch("airflow.sdk.execution_time.supervisor.supervise")
@patch("airflow.providers.edge3.cli.worker.Process")
- @patch("airflow.providers.edge3.cli.worker.Popen")
def test_supervise_launch(
self,
- mock_popen,
mock_process,
mock_supervise,
worker_with_job: EdgeWorker,
):
- mock_popen.side_effect = [MagicMock()]
mock_process_instance = MagicMock()
mock_process.side_effect = [mock_process_instance]
@@ -237,14 +220,12 @@ def test_supervise_launch(
],
)
@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
- @patch("airflow.providers.edge3.cli.worker.logs_logfile_path")
@patch("airflow.providers.edge3.cli.worker.jobs_set_state")
@patch("subprocess.Popen")
def test_fetch_job(
self,
mock_popen,
mock_set_state,
- mock_logfile_path,
mock_reserve_task,
reserve_result,
fetch_result,
@@ -258,11 +239,6 @@ def test_fetch_job(
got_job = worker_with_job.fetch_job()
mock_reserve_task.assert_called_once()
assert got_job == fetch_result
- if AIRFLOW_V_3_0_PLUS:
- # this is only called on Airflow 2.10, AIP-72 includes it
- assert mock_logfile_path.call_count == 0
- else:
- assert mock_logfile_path.call_count == logfile_path_call_count
assert mock_set_state.call_count == set_state_call_count
def test_check_running_jobs_running(self, worker_with_job: EdgeWorker):
diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
index c49f03ead5ed0..b8f411ebd0e03 100644
--- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
+++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-from copy import deepcopy
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
@@ -33,7 +32,6 @@
from airflow.utils.state import TaskInstanceState
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -58,43 +56,12 @@ def get_test_executor(self, pool_slots=1):
return (executor, key)
- @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="_process_tasks is not used in Airflow 3.0+")
- def test__process_tasks_bad_command(self):
- executor, key = self.get_test_executor()
- task_tuple = (key, ["hello", "world"], None, None)
- with pytest.raises(ValueError, match="The command must start with "):
- executor._process_tasks([task_tuple])
-
- @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="_process_tasks is not used in Airflow 3.0+")
- @pytest.mark.parametrize(
- ("pool_slots", "expected_concurrency"),
- [
- pytest.param(1, 1, id="default_pool_size"),
- pytest.param(5, 5, id="increased_pool_size"),
- ],
- )
- def test__process_tasks_ok_command(self, pool_slots, expected_concurrency):
- executor, key = self.get_test_executor(pool_slots=pool_slots)
- task_tuple = (key, ["airflow", "tasks", "run", "hello", "world"], None, None)
- executor._process_tasks([task_tuple])
-
- with create_session() as session:
- jobs: list[EdgeJobModel] = session.query(EdgeJobModel).all()
- assert len(jobs) == 1
- assert jobs[0].dag_id == "test_dag"
- assert jobs[0].run_id == "test_run"
- assert jobs[0].task_id == "test_task"
- assert jobs[0].concurrency_slots == expected_concurrency
-
@patch(f"{Stats.__module__}.Stats.incr")
def test_sync_orphaned_tasks(self, mock_stats_incr):
executor = EdgeExecutor()
delta_to_purge = timedelta(minutes=conf.getint("edge", "job_fail_purge") + 1)
- if AIRFLOW_V_3_0_PLUS:
- delta_to_orphaned_config_name = "task_instance_heartbeat_timeout"
- else:
- delta_to_orphaned_config_name = "scheduler_zombie_task_threshold"
+ delta_to_orphaned_config_name = "task_instance_heartbeat_timeout"
delta_to_orphaned = timedelta(seconds=conf.getint("scheduler", delta_to_orphaned_config_name) + 1)
@@ -297,145 +264,6 @@ def test_sync_active_worker(self):
else:
assert worker.state == EdgeWorkerState.IDLE
- @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow <3.0")
- def test_execute_async(self):
- executor, key = self.get_test_executor()
-
- # Need to apply "trick" which is used to pass pool_slots
- executor.edge_queued_tasks = deepcopy(executor.queued_tasks)
-
- executor.execute_async(key=key, command=["airflow", "tasks", "run", "hello", "world"])
-
- with create_session() as session:
- jobs = session.query(EdgeJobModel).all()
- assert len(jobs) == 1
-
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow 3.0+")
- def test_queue_workload(self):
- from airflow.executors.workloads import ExecuteTask, TaskInstance
-
- executor = self.get_test_executor()[0]
-
- with pytest.raises(TypeError):
- # Does not like the Airflow 2.10 type of workload
- executor.queue_workload(command=["airflow", "tasks", "run", "hello", "world"])
-
- workload = ExecuteTask(
- token="mock",
- ti=TaskInstance(
- id="4d828a62-a417-4936-a7a6-2b3fabacecab",
- task_id="mock",
- dag_id="mock",
- run_id="mock",
- try_number=1,
- pool_slots=1,
- queue="default",
- priority_weight=1,
- start_date=timezone.utcnow(),
- dag_version_id="4d828a62-a417-4936-a7a6-2b3fabacecab",
- ),
- dag_rel_path="mock.py",
- log_path="mock.log",
- bundle_info={"name": "n/a", "version": "no matter"},
- )
- executor.queue_workload(workload=workload)
-
- with create_session() as session:
- jobs = session.query(EdgeJobModel).all()
- assert len(jobs) == 1
-
- @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow <3.0")
- def test_execute_async_updates_existing_job(self):
- executor, key = self.get_test_executor()
-
- # First insert a job with the same key
- with create_session() as session:
- session.add(
- EdgeJobModel(
- dag_id=key.dag_id,
- run_id=key.run_id,
- task_id=key.task_id,
- map_index=key.map_index,
- try_number=key.try_number,
- state=TaskInstanceState.SCHEDULED,
- queue="default",
- concurrency_slots=1,
- command="old-command",
- last_update=timezone.utcnow(),
- )
- )
- session.commit()
-
- # Trigger execute_async which should update the existing job
- executor.edge_queued_tasks = deepcopy(executor.queued_tasks)
- executor.execute_async(key=key, command=["airflow", "tasks", "run", "new", "command"])
-
- with create_session() as session:
- jobs = session.query(EdgeJobModel).all()
- assert len(jobs) == 1
- job = jobs[0]
- assert job.state == TaskInstanceState.QUEUED
- assert job.command != "old-command"
- assert "new" in job.command
-
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow 3.0+")
- def test_queue_workload_updates_existing_job(self):
- from uuid import uuid4
-
- from airflow.executors.workloads import ExecuteTask, TaskInstance
-
- executor = self.get_test_executor()[0]
-
- key = TaskInstanceKey(dag_id="mock", run_id="mock", task_id="mock", map_index=-1, try_number=1)
-
- # Insert an existing job
- with create_session() as session:
- session.add(
- EdgeJobModel(
- dag_id=key.dag_id,
- task_id=key.task_id,
- run_id=key.run_id,
- map_index=key.map_index,
- try_number=key.try_number,
- state=TaskInstanceState.SCHEDULED,
- queue="default",
- command="old-command",
- concurrency_slots=1,
- last_update=timezone.utcnow(),
- )
- )
- session.commit()
-
- # Queue a workload with same key
- workload = ExecuteTask(
- token="mock",
- ti=TaskInstance(
- id=uuid4(),
- task_id=key.task_id,
- dag_id=key.dag_id,
- run_id=key.run_id,
- try_number=key.try_number,
- map_index=key.map_index,
- pool_slots=1,
- queue="updated-queue",
- priority_weight=1,
- start_date=timezone.utcnow(),
- dag_version_id=uuid4(),
- ),
- dag_rel_path="mock.py",
- log_path="mock.log",
- bundle_info={"name": "n/a", "version": "no matter"},
- )
-
- executor.queue_workload(workload=workload)
-
- with create_session() as session:
- jobs = session.query(EdgeJobModel).all()
- assert len(jobs) == 1
- job = jobs[0]
- assert job.queue == "updated-queue"
- assert job.command != "old-command"
-
def test_revoke_task(self):
"""Test that revoke_task removes task from executor and database."""
executor = EdgeExecutor()
diff --git a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
index 21729bb49002b..0eea2ae9c0d12 100644
--- a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
+++ b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py
@@ -20,13 +20,11 @@
from unittest.mock import patch
import pytest
-import time_machine
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.edge3.plugins import edge_executor_plugin
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
def test_plugin_inactive():
@@ -46,7 +44,7 @@ def test_plugin_inactive():
@pytest.mark.db_test
def test_plugin_active_apiserver():
- mock_cli = ["airflow", "api-server"] if AIRFLOW_V_3_0_PLUS else ["gunicorn", "airflow-webserver"]
+ mock_cli = ["airflow", "api-server"]
with conf_vars({("edge", "api_enabled"): "true"}), patch("sys.argv", mock_cli):
importlib.reload(edge_executor_plugin)
@@ -59,13 +57,9 @@ def test_plugin_active_apiserver():
rep = EdgeExecutorPlugin()
assert EDGE_EXECUTOR_ACTIVE
assert RUNNING_ON_APISERVER
- if AIRFLOW_V_3_0_PLUS:
- assert len(rep.appbuilder_views) == 0
- assert len(rep.flask_blueprints) == 0
- assert len(rep.fastapi_apps) == 1
- else:
- assert len(rep.appbuilder_views) == 2
- assert len(rep.flask_blueprints) == 2
+ assert len(rep.appbuilder_views) == 0
+ assert len(rep.flask_blueprints) == 0
+ assert len(rep.fastapi_apps) == 1
@patch("sys.argv", ["airflow", "some-other-command"])
@@ -85,8 +79,7 @@ def test_plugin_active_non_apiserver():
assert len(rep.appbuilder_views) == 0
assert len(rep.flask_blueprints) == 0
assert len(rep.appbuilder_views) == 0
- if AIRFLOW_V_3_0_PLUS:
- assert len(rep.fastapi_apps) == 0
+ assert len(rep.fastapi_apps) == 0
@pytest.fixture
@@ -98,34 +91,3 @@ def plugin():
def test_plugin_is_airflow_plugin(plugin):
assert isinstance(plugin, AirflowPlugin)
-
-
-@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Plugin endpoint is not used in Airflow 3.0+")
-@pytest.mark.parametrize(
- ("initial_comment", "expected_comment"),
- [
- pytest.param(
- "comment", "[2020-01-01 00:00] - user updated maintenance mode\nComment: comment", id="no user"
- ),
- pytest.param(
- "[2019-01-01] - another user put node into maintenance mode\nComment:new comment",
- "[2020-01-01 00:00] - user updated maintenance mode\nComment:new comment",
- id="first update",
- ),
- pytest.param(
- "[2019-01-01] - another user updated maintenance mode\nComment:new comment",
- "[2020-01-01 00:00] - user updated maintenance mode\nComment:new comment",
- id="second update",
- ),
- pytest.param(
- None,
- "[2020-01-01 00:00] - user updated maintenance mode\nComment:",
- id="None as input",
- ),
- ],
-)
-@time_machine.travel("2020-01-01", tick=False)
-def test_modify_maintenance_comment_on_update(monkeypatch, initial_comment, expected_comment):
- assert (
- edge_executor_plugin.modify_maintenance_comment_on_update(initial_comment, "user") == expected_comment
- )
diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
index 11e0cb25561af..5f1b3b6db935c 100644
--- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
@@ -20,12 +20,12 @@
from typing import TYPE_CHECKING
import pytest
+from fastapi import HTTPException
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.edge3.cli.worker import EdgeWorker
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState
from airflow.providers.edge3.worker_api.datamodels import WorkerQueueUpdateBody, WorkerStateBody
-from airflow.providers.edge3.worker_api.routes._v2_compat import HTTPException
from airflow.providers.edge3.worker_api.routes.worker import (
_assert_version,
register,
diff --git a/providers/edge3/www-hash.txt b/providers/edge3/www-hash.txt
index 5f9f2b64f5a40..d71f388e496b5 100644
--- a/providers/edge3/www-hash.txt
+++ b/providers/edge3/www-hash.txt
@@ -1 +1 @@
-29cb09d5e6ea8dde292b43013f0d2dbbe2180687b1e9d766090b0d3f135931c9
+c8817a364febb85bea2a80df205b2b8075caf07101cfb971330dc559c4b160bf
diff --git a/scripts/in_container/run_generate_openapi_spec_providers.py b/scripts/in_container/run_generate_openapi_spec_providers.py
index 99969bf40c9f1..97b1c84c67496 100755
--- a/scripts/in_container/run_generate_openapi_spec_providers.py
+++ b/scripts/in_container/run_generate_openapi_spec_providers.py
@@ -54,7 +54,7 @@ class ProviderDef(NamedTuple):
prefix="/auth",
),
"edge": ProviderDef(
- openapi_spec_file=Path(EDGE_PATH).parent / "openapi" / "v2-edge-generated.yaml",
+ openapi_spec_file=Path(EDGE_PATH).parent / "worker_api" / "v2-edge-generated.yaml",
app=create_edge_worker_api_app(),
prefix="/edge_worker",
),