From c297b7e4332500aa77bb0174db6bd281166896f6 Mon Sep 17 00:00:00 2001 From: "lakshminarayana.kumbha" Date: Wed, 30 Jul 2025 18:07:47 +0530 Subject: [PATCH 1/5] Fix: Handle duplicate entries in EdgeExecutor for deferrable tasks --- .../edge3/executors/edge_executor.py | 68 +++++++++++++++---- 1 file changed, 56 insertions(+), 12 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index de3be5e384ae1..4c0ce5a6309c3 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -140,20 +140,42 @@ def execute_async( del self.edge_queued_tasks[key] self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined] - session.add( - EdgeJobModel( + + # Check if job already exists with same dag_id, task_id, run_id, map_index, try_number + existing_job = ( + session.query(EdgeJobModel) + .filter_by( dag_id=key.dag_id, task_id=key.task_id, run_id=key.run_id, map_index=key.map_index, try_number=key.try_number, - state=TaskInstanceState.QUEUED, - queue=queue or DEFAULT_QUEUE, - concurrency_slots=task_instance.pool_slots, - command=str(command), ) + .first() ) + if existing_job: + # self.log.info(f"EdgeExecutor: Job already exists for {key}, updating it.") + existing_job.state = TaskInstanceState.QUEUED + existing_job.queue = queue or DEFAULT_QUEUE + existing_job.concurrency_slots = task_instance.pool_slots + existing_job.command = str(command) + else: + session.add( + EdgeJobModel( + dag_id=key.dag_id, + task_id=key.task_id, + run_id=key.run_id, + map_index=key.map_index, + try_number=key.try_number, + state=TaskInstanceState.QUEUED, + queue=queue or DEFAULT_QUEUE, + concurrency_slots=task_instance.pool_slots, + command=str(command), + ) + ) + # self.log.info(f"EdgeExecutor: Inserted new job for {key}") + @provide_session def queue_workload( self, @@ -168,20 +190,42 @@ def queue_workload( task_instance = workload.ti key = task_instance.key - session.add( - EdgeJobModel( + + # Check if job already exists with same dag_id, task_id, run_id, map_index, try_number + existing_job = ( + session.query(EdgeJobModel) + .filter_by( dag_id=key.dag_id, task_id=key.task_id, run_id=key.run_id, map_index=key.map_index, try_number=key.try_number, - state=TaskInstanceState.QUEUED, - queue=task_instance.queue, - concurrency_slots=task_instance.pool_slots, - command=workload.model_dump_json(), ) + .first() ) + if existing_job: + # self.log.info(f"EdgeExecutor: Workload already exists for {key}, updating it.") + existing_job.state = TaskInstanceState.QUEUED + existing_job.queue = task_instance.queue + existing_job.concurrency_slots = task_instance.pool_slots + existing_job.command = workload.model_dump_json() + else: + session.add( + EdgeJobModel( + dag_id=key.dag_id, + task_id=key.task_id, + run_id=key.run_id, + map_index=key.map_index, + try_number=key.try_number, + state=TaskInstanceState.QUEUED, + queue=task_instance.queue, + concurrency_slots=task_instance.pool_slots, + command=workload.model_dump_json(), + ) + ) + # self.log.info(f"EdgeExecutor: Inserted new workload for {key}") + def _check_worker_liveness(self, session: Session) -> bool: """Reset worker state if heartbeat timed out.""" changed = False From 335a985ba9caef5b4bc611b3ea589773c586866e Mon Sep 17 00:00:00 2001 From: "lakshminarayana.kumbha" Date: Thu, 31 Jul 2025 12:06:53 +0530 Subject: [PATCH 2/5] Add test cases to validate EdgeExecutor job update and worker sync behavior --- .../edge3/executors/test_edge_executor.py | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py index 7c86b39f12dd4..7e68c4d86ff27 100644 --- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py +++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py @@ -247,6 +247,11 @@ def test_sync_active_worker(self): # Prepare some data with create_session() as session: + # Clear existing workers to avoid unique constraint violation + session.query(EdgeWorkerModel).delete() + session.commit() + + # Add workers with different states for worker_name, state, last_heartbeat in [ ( "inactive_timed_out_worker", @@ -338,3 +343,95 @@ def test_queue_workload(self): with create_session() as session: jobs = session.query(EdgeJobModel).all() assert len(jobs) == 1 + + @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow <3.0") + def test_execute_async_updates_existing_job(self): + executor, key = self.get_test_executor() + + # First insert a job with the same key + with create_session() as session: + session.add( + EdgeJobModel( + dag_id=key.dag_id, + run_id=key.run_id, + task_id=key.task_id, + map_index=key.map_index, + try_number=key.try_number, + state=TaskInstanceState.SCHEDULED, + queue="default", + concurrency_slots=1, + command="old-command", + last_update=timezone.utcnow(), + ) + ) + session.commit() + + # Trigger execute_async which should update the existing job + executor.edge_queued_tasks = deepcopy(executor.queued_tasks) + executor.execute_async(key=key, command=["new", "command"]) + + with create_session() as session: + jobs = session.query(EdgeJobModel).all() + assert len(jobs) == 1 + job = jobs[0] + assert job.state == TaskInstanceState.QUEUED + assert job.command != "old-command" + assert "new" in job.command + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="API only available in Airflow 3.0+") + def test_queue_workload_updates_existing_job(self): + from uuid import uuid4 + + from airflow.executors.workloads import ExecuteTask, TaskInstance + + executor = self.get_test_executor()[0] + + key = TaskInstanceKey(dag_id="mock", run_id="mock", task_id="mock", map_index=-1, try_number=1) + + # Insert an existing job + with create_session() as session: + session.add( + EdgeJobModel( + dag_id=key.dag_id, + task_id=key.task_id, + run_id=key.run_id, + map_index=key.map_index, + try_number=key.try_number, + state=TaskInstanceState.SCHEDULED, + queue="default", + command="old-command", + concurrency_slots=1, + last_update=timezone.utcnow(), + ) + ) + session.commit() + + # Queue a workload with same key + workload = ExecuteTask( + token="mock", + ti=TaskInstance( + id=uuid4(), + task_id=key.task_id, + dag_id=key.dag_id, + run_id=key.run_id, + try_number=key.try_number, + map_index=key.map_index, + pool_slots=1, + queue="updated-queue", + priority_weight=1, + start_date=timezone.utcnow(), + dag_version_id=uuid4(), + ), + dag_rel_path="mock.py", + log_path="mock.log", + bundle_info={"name": "n/a", "version": "no matter"}, + ) + + executor.queue_workload(workload=workload) + + with create_session() as session: + jobs = session.query(EdgeJobModel).all() + assert len(jobs) == 1 + job = jobs[0] + assert job.queue == "updated-queue" + assert job.command != "old-command" From fed6571836a49134a1368fb4b407701ada943645 Mon Sep 17 00:00:00 2001 From: "lakshminarayana.kumbha" Date: Fri, 1 Aug 2025 15:33:03 +0530 Subject: [PATCH 3/5] Fix: Ensure execute_async handles command validation safely across Airflow versions --- .../src/airflow/providers/edge3/executors/edge_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 4c0ce5a6309c3..97606a1b5118b 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -139,7 +139,10 @@ def execute_async( task_instance = self.edge_queued_tasks[key][3] # type: ignore[index] del self.edge_queued_tasks[key] - self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined] + # Run validation only if supported (available in Airflow 2.12+) + # This makes it compatible with older versions like 2.10 and 2.11 + if hasattr(self, "validate_airflow_tasks_run_command"): + self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined] # Check if job already exists with same dag_id, task_id, run_id, map_index, try_number existing_job = ( From b52efdb1cb117efdca3135ce2f8cecab0de9be9e Mon Sep 17 00:00:00 2001 From: "lakshminarayana.kumbha" Date: Sun, 3 Aug 2025 02:12:07 +0530 Subject: [PATCH 4/5] Fix test to use valid Airflow task command format --- .../src/airflow/providers/edge3/executors/edge_executor.py | 5 +---- .../edge3/tests/unit/edge3/executors/test_edge_executor.py | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 97606a1b5118b..4c0ce5a6309c3 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -139,10 +139,7 @@ def execute_async( task_instance = self.edge_queued_tasks[key][3] # type: ignore[index] del self.edge_queued_tasks[key] - # Run validation only if supported (available in Airflow 2.12+) - # This makes it compatible with older versions like 2.10 and 2.11 - if hasattr(self, "validate_airflow_tasks_run_command"): - self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined] + self.validate_airflow_tasks_run_command(command) # type: ignore[attr-defined] # Check if job already exists with same dag_id, task_id, run_id, map_index, try_number existing_job = ( diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py index 7e68c4d86ff27..ca831fb3e06dc 100644 --- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py +++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py @@ -368,7 +368,7 @@ def test_execute_async_updates_existing_job(self): # Trigger execute_async which should update the existing job executor.edge_queued_tasks = deepcopy(executor.queued_tasks) - executor.execute_async(key=key, command=["new", "command"]) + executor.execute_async(key=key, command=["airflow", "tasks", "run", "new", "command"]) with create_session() as session: jobs = session.query(EdgeJobModel).all() From 79a64734030d367139f38c11bd7b4ce95c2c0c55 Mon Sep 17 00:00:00 2001 From: "lakshminarayana.kumbha" Date: Mon, 4 Aug 2025 12:58:41 +0530 Subject: [PATCH 5/5] Clean up: Remove unused debug logs from EdgeExecutor --- .../src/airflow/providers/edge3/executors/edge_executor.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 4c0ce5a6309c3..6b15cd342cc05 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -155,7 +155,6 @@ def execute_async( ) if existing_job: - # self.log.info(f"EdgeExecutor: Job already exists for {key}, updating it.") existing_job.state = TaskInstanceState.QUEUED existing_job.queue = queue or DEFAULT_QUEUE existing_job.concurrency_slots = task_instance.pool_slots @@ -174,7 +173,6 @@ def execute_async( command=str(command), ) ) - # self.log.info(f"EdgeExecutor: Inserted new job for {key}") @provide_session def queue_workload( @@ -205,7 +203,6 @@ def queue_workload( ) if existing_job: - # self.log.info(f"EdgeExecutor: Workload already exists for {key}, updating it.") existing_job.state = TaskInstanceState.QUEUED existing_job.queue = task_instance.queue existing_job.concurrency_slots = task_instance.pool_slots @@ -224,7 +221,6 @@ def queue_workload( command=workload.model_dump_json(), ) ) - # self.log.info(f"EdgeExecutor: Inserted new workload for {key}") def _check_worker_liveness(self, session: Session) -> bool: """Reset worker state if heartbeat timed out."""