Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ repos:
^.*openapi.*\.yaml$|
^\.pre-commit-config\.yaml$|
^.*reproducible_build\.yaml$|
^.*pnpm-lock\.yaml$
^.*pnpm-lock\.yaml$|
^.*-generated\.yaml$
- repo: https://github.com/ikamensh/flynt
rev: 97be693bf18bc2f050667dd282d243e2824b81e2 # frozen: 1.0.6
hooks:
Expand Down
2 changes: 1 addition & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ www-hash.txt
*generated.*
/src/airflow/providers/keycloak/auth_manager/openapi/v2-keycloak-auth-manager-generated.yaml
/src/airflow/providers/edge3/plugins/www/*
/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v2-fab-auth-manager-generated.yaml
/src/airflow/providers/fab/www/static/dist/*
/any/dag_id=dag_for_testing_redis_task_handler/run_id=test/task_id=task_for_testing_redis_log_handler/attempt=1.log
Expand Down
2 changes: 1 addition & 1 deletion dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def get_airflow_extras():
{
"python-version": "3.10",
"airflow-version": "2.11.0",
"remove-providers": "common.messaging fab git keycloak",
"remove-providers": "common.messaging edge3 fab git keycloak",
"run-unit-tests": "true",
},
{
Expand Down
2 changes: 1 addition & 1 deletion go-sdk/pkg/edgeapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

//go:generate -command openapi-gen go run github.com/ashb/oapi-resty-codegen@latest --config oapi-codegen.yml

//go:generate openapi-gen https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml
//go:generate openapi-gen https://raw.githubusercontent.com/apache/airflow/refs/tags/providers-edge3/1.3.0/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml

func WithEdgeAPIJWTKey(key []byte) ClientOption {
return func(c *Client) error {
Expand Down
2 changes: 1 addition & 1 deletion providers/edge3/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repos:
files: |
(?x)
^src/airflow/providers/edge3/plugins/www/.*\.(js|ts|tsx|yaml|css|json)$|
^src/airflow/providers/edge3/openapi/v2-edge-generated.yaml$
^src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml$
exclude: |
(?x)
^src/airflow/providers/edge3/plugins/www/node-modules/.*|
Expand Down
2 changes: 1 addition & 1 deletion providers/edge3/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Requirements
========================================== ====================
PIP package Version required
========================================== ====================
``apache-airflow`` ``>=2.11.0,!=3.1.0``
``apache-airflow`` ``>=3.0.0,!=3.1.0``
``apache-airflow-providers-common-compat`` ``>=1.10.0``
``pydantic`` ``>=2.11.0``
``retryhttp`` ``>=1.2.0,!=1.3.0``
Expand Down
4 changes: 2 additions & 2 deletions providers/edge3/docs/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ deployed outside of the central Airflow cluster is connected via HTTP(s) to the
* **Workers** (Central) - Execute the assigned tasks - most standard setup has local or centralized workers, e.g. via Celery
* **Edge Workers** - Special workers which pull tasks via HTTP(s) as provided as feature via this provider package
* **Scheduler** - Responsible for adding the necessary tasks to the queue. The EdgeExecutor is running as a module inside the scheduler.
* **API server** (webserver in Airflow 2.x) - HTTP REST API Server provides access to Dag/task status information. The required end-points are
* **API server** - HTTP REST API Server provides access to Dag/task status information. The required end-points are
provided by the Edge provider plugin. The Edge Worker uses this API to pull tasks and send back the results.
* **Database** - Contains information about the status of tasks, Dags, Variables, connections, etc.

Expand All @@ -77,7 +77,7 @@ In detail the parts of the Edge provider are deployed as follows:
need to set the ``executor`` configuration option in the ``airflow.cfg`` file to
``airflow.providers.edge3.executors.EdgeExecutor``. For more details see :doc:`edge_executor`. Note that also
multiple executors can be used in parallel together with the EdgeExecutor.
* **API server** (webserver in Airflow 2.x) - The API server is providing REST endpoints to the web UI as well
* **API server** - The API server is providing REST endpoints to the web UI as well
as serves static files. The Edge provider adds a plugin that provides additional REST API for the Edge Worker
as well as UI elements to manage workers (not available in Airflow 3.0).
The API server is responsible for handling requests from the Edge Worker and sending back the results. To
Expand Down
8 changes: 8 additions & 0 deletions providers/edge3/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

.. warning::
This release of the Edge3 provider drops support for Airflow versions below 3.0.0.

The support for Airflow 2.10-2.11 was experimental and GA for the provider is only for Airflow 3.0+.
Productive operation was not intended in Airflow 2.x, therefore the support for Airflow 2.x is now dropped
earlier than the usual release support policy would indicate.


1.6.0
.....

Expand Down
4 changes: 2 additions & 2 deletions providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ process as and wait until all running tasks are completed. Also in a console you
If you want to monitor the remote activity and worker, use the UI plugin which
is included in the provider package and install it on the api-server / webserver and use the
"Admin" - "Edge Worker Hosts" and "Edge Worker Jobs" pages.
(Note: The plugin is not available on Airflow 3.0 UI, it is only in 2.x and 3.1++)
(Note: The plugin is not available on Airflow 3.0 UI, it is only in 3.1++)

If you want to check status of the worker via CLI you can use the command

Expand Down Expand Up @@ -150,7 +150,7 @@ Worker status can be checked via the web UI in the "Admin" - "Edge Worker" page.

.. note::

Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 2.10 and in 3.1 and newer.
Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 3.1 and newer.
Alternatively you can use the CLI commands as described in :ref:`deployment:maintenance-mgmt-cli`.


Expand Down
4 changes: 2 additions & 2 deletions providers/edge3/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ For the minimum Airflow version supported, see ``Requirements`` below.
Requirements
------------

The minimum Apache Airflow version supported by this provider distribution is ``2.11.0``.
The minimum Apache Airflow version supported by this provider distribution is ``3.0.0``.

========================================== ====================
PIP package Version required
========================================== ====================
``apache-airflow`` ``>=2.11.0,!=3.1.0``
``apache-airflow`` ``>=3.0.0,!=3.1.0``
``apache-airflow-providers-common-compat`` ``>=1.10.0``
``pydantic`` ``>=2.11.0``
``retryhttp`` ``>=1.2.0,!=1.3.0``
Expand Down
6 changes: 3 additions & 3 deletions providers/edge3/docs/ui_plugin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The Edge provider uses a Plugin to

- Extend the REST API endpoints for connecting workers to the Airflow cluster
- Provide a web UI for managing the workers and monitoring their status and tasks
(Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is only available in Airflow 2.x and in 3.1 and newer.)
(Note: Airflow 3.0 does not have support for UI plugins. The UI plugin is only available in Airflow 3.1 and newer.)

REST API endpoints
------------------
Expand All @@ -35,14 +35,14 @@ The Edge provider adds the following REST API endpoints to the Airflow API:
- ``/edge_worker/v1/health``: Check that the API endpoint is deployed and active

To see full documentation of the API endpoints open the Airflow web UI and navigate to
the sub-path ``/edge_worker/docs`` (Airflow 3.0) or ``/edge_worker/v1/ui`` (Airflow 2.x).
the sub-path ``/edge_worker/docs``.

Web UI Plugin
-------------

.. note::

Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 2.x and in 3.1 and newer.
Airflow 3.0 does not support UI plugins. The UI plugin is only available in Airflow 3.1 and newer.
Alternatively you can use the CLI commands as described in :ref:`deployment:maintenance-mgmt-cli`.

The Edge provider adds a web UI plugin to the Airflow web UI. The plugin is
Expand Down
2 changes: 1 addition & 1 deletion providers/edge3/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ requires-python = ">=3.10"
# Make sure to run ``prek update-providers-dependencies --all-files``
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0,!=3.1.0",
"apache-airflow>=3.0.0,!=3.1.0",
"apache-airflow-providers-common-compat>=1.10.0", # use next version
"pydantic>=2.11.0",
"retryhttp>=1.2.0,!=1.3.0",
Expand Down
4 changes: 2 additions & 2 deletions providers/edge3/src/airflow/providers/edge3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
__version__ = "1.6.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.11.0"
"3.0.0"
):
raise RuntimeError(
f"The package `apache-airflow-providers-edge3:{__version__}` needs Apache Airflow 2.11.0+"
f"The package `apache-airflow-providers-edge3:{__version__}` needs Apache Airflow 3.0.0+"
)
35 changes: 12 additions & 23 deletions providers/edge3/src/airflow/providers/edge3/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
from datetime import datetime
from functools import cache
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -29,12 +30,12 @@
from retryhttp import retry, wait_retry_after
from tenacity import before_sleep_log, wait_random_exponential

from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.configuration import conf
from airflow.providers.edge3.models.edge_worker import (
EdgeWorkerDuplicateException,
EdgeWorkerVersionException,
)
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.edge3.worker_api.datamodels import (
EdgeJobFetched,
PushLogsBody,
Expand Down Expand Up @@ -74,6 +75,15 @@
_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)


@cache
def jwt_generator() -> JWTGenerator:
return JWTGenerator(
secret_key=conf.get("api_auth", "jwt_secret"),
valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
audience="api",
)


@retry(
reraise=True,
max_attempt_number=API_RETRIES,
Expand All @@ -84,28 +94,7 @@
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
if AIRFLOW_V_3_0_PLUS:
from functools import cache

from airflow.api_fastapi.auth.tokens import JWTGenerator

@cache
def jwt_generator() -> JWTGenerator:
return JWTGenerator(
secret_key=conf.get("api_auth", "jwt_secret"),
valid_for=conf.getint("api_auth", "jwt_leeway", fallback=30),
audience="api",
)

generator = jwt_generator()
authorization = generator.generate({"method": rest_path})
else:
# Airflow 2.10 compatibility
from airflow.providers.edge3.worker_api.auth import jwt_signer

signer = jwt_signer()
authorization = signer.generate_signed_token({"method": rest_path})

authorization = jwt_generator().generate({"method": rest_path})
api_url = conf.get("edge", "api_url")
headers = {
"Content-Type": "application/json",
Expand Down
28 changes: 1 addition & 27 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
from subprocess import Popen
from time import sleep
from typing import TYPE_CHECKING

Expand All @@ -39,7 +38,6 @@
from airflow.providers.edge3.cli.api_client import (
jobs_fetch,
jobs_set_state,
logs_logfile_path,
logs_push,
worker_register,
worker_set_state,
Expand All @@ -56,7 +54,6 @@
EdgeWorkerState,
EdgeWorkerVersionException,
)
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.net import getfqdn
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -218,7 +215,7 @@ def _run_job_via_supervisor(workload, execution_api_server_url) -> int:
return 1

@staticmethod
def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
def _launch_job(edge_job: EdgeJobFetched):
if TYPE_CHECKING:
from airflow.executors.workloads import ExecuteTask

Expand All @@ -232,29 +229,6 @@ def _launch_job_af3(edge_job: EdgeJobFetched) -> tuple[Process, Path]:
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in here
logfile = Path(base_log_folder, workload.log_path)
return process, logfile

@staticmethod
def _launch_job_af2_10(edge_job: EdgeJobFetched) -> tuple[Popen, Path]:
"""Compatibility for Airflow 2.10 Launch."""
env = os.environ.copy()
env["AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION"] = "True"
env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge", "api_url")
env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
command: list[str] = edge_job.command # type: ignore[assignment]
process = Popen(command, close_fds=True, env=env, start_new_session=True)
logfile = logs_logfile_path(edge_job.key)
return process, logfile

@staticmethod
def _launch_job(edge_job: EdgeJobFetched):
"""Get the received job executed."""
process: Popen | Process
if AIRFLOW_V_3_0_PLUS:
process, logfile = EdgeWorker._launch_job_af3(edge_job)
else:
# Airflow 2.10
process, logfile = EdgeWorker._launch_job_af2_10(edge_job)
EdgeWorker.jobs.append(Job(edge_job, process, logfile, 0))

def start(self):
Expand Down
Loading
Loading