Skip to content
Merged
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from collections import deque
from collections.abc import Generator, Iterable
from contextlib import suppress
from datetime import datetime
from socket import socket
from traceback import format_exception
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
Expand Down Expand Up @@ -962,7 +963,7 @@ async def create_triggers(self):
)
self.triggers[trigger_id] = {
"task": asyncio.create_task(
self.run_trigger(trigger_id, trigger_instance), name=trigger_name
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after), name=trigger_name
),
"is_watcher": isinstance(trigger_instance, events.BaseEventTrigger),
"name": trigger_name,
Expand Down Expand Up @@ -1097,7 +1098,7 @@ async def block_watchdog(self):
)
Stats.incr("triggers.blocked_main_thread")

async def run_trigger(self, trigger_id, trigger):
async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None):
"""Run a trigger (they are async generators) and push their events into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
import greenback
Expand All @@ -1118,7 +1119,7 @@ async def run_trigger(self, trigger_id, trigger):
except asyncio.CancelledError:
# We get cancelled by the scheduler changing the task state. But if we do lets give a nice error
# message about it
if timeout := trigger.timeout_after:
if timeout := timeout_after:
timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
if timeout < timezone.utcnow():
await self.log.aerror("Trigger cancelled due to timeout")
Expand Down
7 changes: 5 additions & 2 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,14 @@ def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None:
1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", "events": 0}
}
mock_trigger = MagicMock(spec=BaseTrigger)
mock_trigger.timeout_after = timezone.utcnow() - datetime.timedelta(hours=1)
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
asyncio.run(
trigger_runner.run_trigger(
1, mock_trigger, timeout_after=timezone.utcnow() - datetime.timedelta(hours=1)
)
)
assert {"event": "Trigger cancelled due to timeout", "log_level": "error"} in cap_structlog

@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")
Expand Down