Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions providers/edge/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

Package ``apache-airflow-providers-edge``

Release: ``0.18.1pre0``
Release: ``0.19.0pre0``


Handle edge workers on remote sites via HTTP(s) connection and orchestrates work over distributed sites
Expand All @@ -36,7 +36,7 @@ This is a provider package for ``edge`` provider. All classes for this provider
are in ``airflow.providers.edge`` python package.

You can find package information and changelog for the provider
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.18.1pre0/>`_.
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/>`_.

Installation
------------
Expand All @@ -59,4 +59,4 @@ PIP package Version required
================== ===================

The changelog for the provider package can be found in the
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.18.1pre0/changelog.html>`_.
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html>`_.
9 changes: 9 additions & 0 deletions providers/edge/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
Changelog
---------

0.19.0pre0
..........

Misc
~~~~

* ``Edge worker can be set to maintenance via CLI and also return to normal operation.``


0.18.1pre0
..........

Expand Down
2 changes: 1 addition & 1 deletion providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ source-date-epoch: 1737371680

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.18.1pre0
- 0.19.0pre0

plugins:
- name: edge_executor
Expand Down
6 changes: 3 additions & 3 deletions providers/edge/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"

[project]
name = "apache-airflow-providers-edge"
version = "0.18.1pre0"
version = "0.19.0pre0"
description = "Provider package apache-airflow-providers-edge for Apache Airflow"
readme = "README.rst"
authors = [
Expand Down Expand Up @@ -61,8 +61,8 @@ dependencies = [
]

[project.urls]
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.18.1pre0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.18.1pre0/changelog.html"
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
Expand Down
2 changes: 1 addition & 1 deletion providers/edge/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.18.1pre0"
__version__ = "0.19.0pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
13 changes: 11 additions & 2 deletions providers/edge/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,24 @@ def worker_register(


def worker_set_state(
hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, sysinfo: dict
hostname: str,
state: EdgeWorkerState,
jobs_active: int,
queues: list[str] | None,
sysinfo: dict,
maintenance_comments: str | None = None,
) -> WorkerSetStateReturn:
"""Update the state of the worker in the central site and thereby implicitly heartbeat."""
try:
result = _make_generic_request(
"PATCH",
f"worker/{quote(hostname)}",
WorkerStateBody(
state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo
state=state,
jobs_active=jobs_active,
queues=queues,
sysinfo=sysinfo,
maintenance_comments=maintenance_comments,
).model_dump_json(exclude_unset=True),
)
except requests.HTTPError as e:
Expand Down
95 changes: 95 additions & 0 deletions providers/edge/src/airflow/providers/edge/cli/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from psutil import Popen

from airflow.providers.edge.models.edge_worker import EdgeWorkerState
from airflow.providers.edge.worker_api.datamodels import EdgeJobFetched


@dataclass
class MaintenanceMarker:
"""Maintenance mode status."""

maintenance: str
comments: str | None

@property
def json(self) -> str:
"""Get the maintenance status as JSON."""
return json.dumps(asdict(self))

@staticmethod
def from_json(json_str: str) -> MaintenanceMarker:
"""Create a Maintenance object from JSON."""
return MaintenanceMarker(**json.loads(json_str))


@dataclass
class WorkerStatus:
"""Status of the worker."""

job_count: int
jobs: list
state: EdgeWorkerState
maintenance: bool
maintenance_comments: str | None
drain: bool

@property
def json(self) -> str:
"""Get the status as JSON."""
return json.dumps(asdict(self))

@staticmethod
def from_json(json_str: str) -> WorkerStatus:
"""Create a WorkerStatus object from JSON."""
return WorkerStatus(**json.loads(json_str))


@dataclass
class Job:
"""Holds all information for a task/job to be executed as bundle."""

edge_job: EdgeJobFetched
process: Popen | Process
logfile: Path
logsize: int
"""Last size of log file, point of last chunk push."""

@property
def is_running(self) -> bool:
"""Check if the job is still running."""
if hasattr(self.process, "returncode") and hasattr(self.process, "poll"):
self.process.poll()
return self.process.returncode is None
return self.process.exitcode is None

@property
def is_success(self) -> bool:
"""Check if the job was successful."""
if hasattr(self.process, "returncode"):
return self.process.returncode == 0
return self.process.exitcode == 0
Loading