Skip to content

Commit

Permalink
Revert removal of worker internal API call allowing to better migrate…
Browse files Browse the repository at this point in the history
… over existing setups
  • Loading branch information
jscheffl committed Nov 24, 2024
1 parent 9064919 commit 6f569b7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
75 changes: 74 additions & 1 deletion providers/src/airflow/providers/edge/models/edge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,24 @@
import json
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Optional

from sqlalchemy import Column, Integer, String
from pydantic import BaseModel, ConfigDict
from sqlalchemy import Column, Integer, String, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import AirflowException
from airflow.models.base import Base
from airflow.serialization.serialized_objects import add_pydantic_class_type_mapping
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session


class EdgeWorkerVersionException(AirflowException):
"""Signal a version mismatch between core and Edge Site."""
Expand Down Expand Up @@ -158,3 +166,68 @@ def reset_metrics(worker_name: str) -> None:
concurrency=0,
queues=None,
)


class EdgeWorker(BaseModel, LoggingMixin):
"""Deprecated Edge Worker internal API, keeping for one minor for graceful migration."""

worker_name: str
state: EdgeWorkerState
queues: Optional[list[str]] # noqa: UP007 - prevent Sphinx failing
first_online: datetime
last_update: Optional[datetime] = None # noqa: UP007 - prevent Sphinx failing
jobs_active: int
jobs_taken: int
jobs_success: int
jobs_failed: int
sysinfo: str
model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)

@staticmethod
@internal_api_call
@provide_session
def set_state(
worker_name: str,
state: EdgeWorkerState,
jobs_active: int,
sysinfo: dict[str, str],
session: Session = NEW_SESSION,
) -> list[str] | None:
"""Set state of worker and returns the current assigned queues."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.state = state
worker.jobs_active = jobs_active
worker.sysinfo = json.dumps(sysinfo)
worker.last_update = timezone.utcnow()
session.commit()
Stats.incr(f"edge_worker.heartbeat_count.{worker_name}", 1, 1)
Stats.incr("edge_worker.heartbeat_count", 1, 1, tags={"worker_name": worker_name})
set_metrics(
worker_name=worker_name,
state=state,
connected=True,
jobs_active=jobs_active,
concurrency=int(sysinfo["concurrency"]),
queues=worker.queues,
)
raise EdgeWorkerVersionException(
"Edge Worker runs on an old version. Rejecting access due to difference."
)

@staticmethod
@internal_api_call
def register_worker(
worker_name: str,
state: EdgeWorkerState,
queues: list[str] | None,
sysinfo: dict[str, str],
) -> EdgeWorker:
raise EdgeWorkerVersionException(
"Edge Worker runs on an old version. Rejecting access due to difference."
)


EdgeWorker.model_rebuild()

add_pydantic_class_type_mapping("edge_worker", EdgeWorkerModel, EdgeWorker)
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
def _initialize_method_map() -> dict[str, Callable]:
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker

internal_api_functions = initialize_method_map().values()
functions: list[Callable] = [
Expand All @@ -56,6 +57,8 @@ def _initialize_method_map() -> dict[str, Callable]:
EdgeJob.reserve_task,
EdgeJob.set_state,
EdgeLogs.push_logs,
EdgeWorker.register_worker,
EdgeWorker.set_state,
]
return {f"{func.__module__}.{func.__qualname__}": func for func in functions}

Expand Down

0 comments on commit 6f569b7

Please sign in to comment.