From feb85a4c1e642bceb21a54e2dc85055b0da76bf8 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Thu, 8 May 2025 11:01:16 +0200 Subject: [PATCH 1/3] Edge worker maintenance state is remembered if worker crashes --- .../edge3/executors/edge_executor.py | 30 ++++++++++++++++++- .../edge3/worker_api/routes/worker.py | 7 ++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 55d5414cd8965..fd79533760f20 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -177,7 +177,30 @@ def _check_worker_liveness(self, session: Session) -> bool: .with_for_update(skip_locked=True) .filter( EdgeWorkerModel.state.not_in( - [EdgeWorkerState.UNKNOWN, EdgeWorkerState.OFFLINE, EdgeWorkerState.OFFLINE_MAINTENANCE] + [ + EdgeWorkerState.UNKNOWN, + EdgeWorkerState.OFFLINE, + EdgeWorkerState.OFFLINE_MAINTENANCE, + EdgeWorkerState.MAINTENANCE_MODE, + EdgeWorkerState.MAINTENANCE_PENDING, + EdgeWorkerState.MAINTENANCE_REQUEST, + ] + ), + EdgeWorkerModel.last_update < (timezone.utcnow() - timedelta(seconds=heartbeat_interval * 5)), + ) + .all() + ) + + lifeless_workers_maintenance: list[EdgeWorkerModel] = ( + session.query(EdgeWorkerModel) + .with_for_update(skip_locked=True) + .filter( + EdgeWorkerModel.state.in_( + [ + EdgeWorkerState.MAINTENANCE_MODE, + EdgeWorkerState.MAINTENANCE_PENDING, + EdgeWorkerState.MAINTENANCE_REQUEST, + ] ), EdgeWorkerModel.last_update < (timezone.utcnow() - timedelta(seconds=heartbeat_interval * 5)), ) @@ -189,6 +212,11 @@ def _check_worker_liveness(self, session: Session) -> bool: worker.state = EdgeWorkerState.UNKNOWN reset_metrics(worker.worker_name) + for worker in lifeless_workers_maintenance: + changed = True + worker.state = EdgeWorkerState.OFFLINE_MAINTENANCE + reset_metrics(worker.worker_name) + return changed def _update_orphaned_jobs(self, session: Session) -> bool: diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py index f251d32acf998..b24330733c8a5 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py @@ -122,7 +122,12 @@ def redefine_state(worker_state: EdgeWorkerState, body_state: EdgeWorkerState) - EdgeWorkerState.MAINTENANCE_PENDING, EdgeWorkerState.MAINTENANCE_MODE, ) - or worker_state == EdgeWorkerState.OFFLINE_MAINTENANCE + or worker_state + in ( + EdgeWorkerState.OFFLINE_MAINTENANCE, + EdgeWorkerState.MAINTENANCE_MODE, + EdgeWorkerState.MAINTENANCE_PENDING, + ) and body_state == EdgeWorkerState.STARTING ): return EdgeWorkerState.MAINTENANCE_REQUEST From 9a220507d028bd582b580e5e97b5fa96b1b05956 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Thu, 8 May 2025 11:20:09 +0200 Subject: [PATCH 2/3] Add pytests --- .../edge3/worker_api/routes/test_worker.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py index 4cf5ffc80031e..3553ec0e083db 100644 --- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py +++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py @@ -143,6 +143,24 @@ def test_register(self, session: Session, input_queues: list[str] | None, cli_wo EdgeWorkerState.MAINTENANCE_REQUEST, id="maintenance_starting", ), + pytest.param( + EdgeWorkerState.MAINTENANCE_MODE, + EdgeWorkerState.STARTING, + EdgeWorkerState.MAINTENANCE_REQUEST, + id="maintenance_crash", + ), + pytest.param( + EdgeWorkerState.MAINTENANCE_PENDING, + EdgeWorkerState.STARTING, + EdgeWorkerState.MAINTENANCE_REQUEST, + id="maintenance_crash_2", + ), + pytest.param( + EdgeWorkerState.MAINTENANCE_REQUEST, + EdgeWorkerState.STARTING, + EdgeWorkerState.MAINTENANCE_REQUEST, + id="maintenance_crash_3", + ), ], ) def test_redefine_state( From 176ab3a6bea4a575195d36b94d406be6b51f18fb Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Thu, 8 May 2025 11:45:45 +0200 Subject: [PATCH 3/3] Consolidate code --- .../edge3/executors/edge_executor.py | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index fd79533760f20..a2ebe43565299 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -181,25 +181,6 @@ def _check_worker_liveness(self, session: Session) -> bool: EdgeWorkerState.UNKNOWN, EdgeWorkerState.OFFLINE, EdgeWorkerState.OFFLINE_MAINTENANCE, - EdgeWorkerState.MAINTENANCE_MODE, - EdgeWorkerState.MAINTENANCE_PENDING, - EdgeWorkerState.MAINTENANCE_REQUEST, - ] - ), - EdgeWorkerModel.last_update < (timezone.utcnow() - timedelta(seconds=heartbeat_interval * 5)), - ) - .all() - ) - - lifeless_workers_maintenance: list[EdgeWorkerModel] = ( - session.query(EdgeWorkerModel) - .with_for_update(skip_locked=True) - .filter( - EdgeWorkerModel.state.in_( - [ - EdgeWorkerState.MAINTENANCE_MODE, - EdgeWorkerState.MAINTENANCE_PENDING, - EdgeWorkerState.MAINTENANCE_REQUEST, ] ), EdgeWorkerModel.last_update < (timezone.utcnow() - timedelta(seconds=heartbeat_interval * 5)), @@ -209,12 +190,17 @@ def _check_worker_liveness(self, session: Session) -> bool: for worker in lifeless_workers: changed = True - worker.state = EdgeWorkerState.UNKNOWN - reset_metrics(worker.worker_name) - - for worker in lifeless_workers_maintenance: - changed = True - worker.state = EdgeWorkerState.OFFLINE_MAINTENANCE + # If the worker dies in maintenance mode we want to remember it, so it can start in maintenance mode + worker.state = ( + EdgeWorkerState.OFFLINE_MAINTENANCE + if worker.state + in ( + EdgeWorkerState.MAINTENANCE_MODE, + EdgeWorkerState.MAINTENANCE_PENDING, + EdgeWorkerState.MAINTENANCE_REQUEST, + ) + else EdgeWorkerState.UNKNOWN + ) reset_metrics(worker.worker_name) return changed