From 789ada02c7208091887e012abf22d8884a15260d Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 13 Nov 2025 20:27:49 +0100 Subject: [PATCH 1/5] refactor: Fixed timeout_after in run_trigger method of TriggererJobRunner --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index cd8b19622d2ef..64b599c99a1e4 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -27,6 +27,7 @@ from collections import deque from collections.abc import Generator, Iterable from contextlib import suppress +from datetime import datetime from socket import socket from traceback import format_exception from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict @@ -962,7 +963,7 @@ async def create_triggers(self): ) self.triggers[trigger_id] = { "task": asyncio.create_task( - self.run_trigger(trigger_id, trigger_instance), name=trigger_name + self.run_trigger(trigger_id, trigger_instance, workload.timeout_after), name=trigger_name ), "is_watcher": isinstance(trigger_instance, events.BaseEventTrigger), "name": trigger_name, @@ -1097,7 +1098,7 @@ async def block_watchdog(self): ) Stats.incr("triggers.blocked_main_thread") - async def run_trigger(self, trigger_id, trigger): + async def run_trigger(self, trigger_id, trigger: BaseTrigger, timeout_after: datetime | None = None): """Run a trigger (they are async generators) and push their events into our outbound event deque.""" if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true": import greenback @@ -1118,7 +1119,7 @@ async def run_trigger(self, trigger_id, trigger): except asyncio.CancelledError: # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error # message about it - if timeout := trigger.timeout_after: + if timeout := timeout_after: timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout if timeout < timezone.utcnow(): await self.log.aerror("Trigger cancelled due to timeout") From 1888f48f211d93d052c6d393e087e2c3e523b595 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 14 Nov 2025 08:19:47 +0100 Subject: [PATCH 2/5] refactor: Added typing to trigger_id of run_trigger method --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 64b599c99a1e4..5833593ee820e 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1098,7 +1098,7 @@ async def block_watchdog(self): ) Stats.incr("triggers.blocked_main_thread") - async def run_trigger(self, trigger_id, trigger: BaseTrigger, timeout_after: datetime | None = None): + async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None): """Run a trigger (they are async generators) and push their events into our outbound event deque.""" if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true": import greenback From c3fd0e339adbacc1602c1e8effd0c69229d5e656 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 14 Nov 2025 09:07:16 +0100 Subject: [PATCH 3/5] refactor: Fixed test_run_inline_trigger_timeout --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 4edbcf21e3541..8e7ac543cb8c6 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -325,11 +325,10 @@ def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: 1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", "events": 0} } mock_trigger = MagicMock(spec=BaseTrigger) - mock_trigger.timeout_after = timezone.utcnow() - datetime.timedelta(hours=1) mock_trigger.run.side_effect = asyncio.CancelledError() with pytest.raises(asyncio.CancelledError): - asyncio.run(trigger_runner.run_trigger(1, mock_trigger)) + asyncio.run(trigger_runner.run_trigger(1, mock_trigger, timeout_after=timezone.utcnow() - datetime.timedelta(hours=1))) assert {"event": "Trigger cancelled due to timeout", "log_level": "error"} in cap_structlog @patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs") From 88524f040a6a757035268c5aecdc1ebf26aa9f3f Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 14 Nov 2025 09:36:14 +0100 Subject: [PATCH 4/5] refactor: Reformatted test_run_inline_trigger_timeout --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 8e7ac543cb8c6..4bb6cb26d7a00 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -328,7 +328,11 @@ def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: mock_trigger.run.side_effect = asyncio.CancelledError() with pytest.raises(asyncio.CancelledError): - asyncio.run(trigger_runner.run_trigger(1, mock_trigger, timeout_after=timezone.utcnow() - datetime.timedelta(hours=1))) + asyncio.run( + trigger_runner.run_trigger( + 1, mock_trigger,timeout_after=timezone.utcnow() - datetime.timedelta(hours=1) + ) + ) assert {"event": "Trigger cancelled due to timeout", "log_level": "error"} in cap_structlog @patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs") From 8af000858d2c6bf6e12e7f140ba4e5f330b25192 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 14 Nov 2025 10:08:16 +0100 Subject: [PATCH 5/5] refactor: Fixed reformatting of test_run_inline_trigger_timeout --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 4bb6cb26d7a00..a182c32b189c4 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -330,7 +330,7 @@ def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: with pytest.raises(asyncio.CancelledError): asyncio.run( trigger_runner.run_trigger( - 1, mock_trigger,timeout_after=timezone.utcnow() - datetime.timedelta(hours=1) + 1, mock_trigger, timeout_after=timezone.utcnow() - datetime.timedelta(hours=1) ) ) assert {"event": "Trigger cancelled due to timeout", "log_level": "error"} in cap_structlog