Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
082fc70
refactor: If asend in TriggerRunner comms decoder crashes due to NotI…
dabla Jan 6, 2026
5ca3b39
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 6, 2026
b303403
refactor: Applied some reformatting
dabla Jan 6, 2026
5de68c1
refactor: Fixed some mypy issues
dabla Jan 6, 2026
14082fe
refactor: Fixed return type of send_changes method
dabla Jan 6, 2026
9cbf5a8
refactor: Changed imports of trigger events
dabla Jan 6, 2026
3045fad
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 6, 2026
6ebf6a5
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 6, 2026
e9dc8a4
refactor: Reformatted trigger job runner
dabla Jan 6, 2026
65999fe
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 6, 2026
6d70c53
refactor: Fixed mocking of comms decoder
dabla Jan 7, 2026
693415c
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 7, 2026
a9a6cd8
refactor: Forgot to add the patched supervisor_builder
dabla Jan 7, 2026
dc581f9
refactor: Changed asserts in test_sync_state_to_supervisor
dabla Jan 7, 2026
521aac4
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 7, 2026
5b98350
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 7, 2026
60421a0
refactor: Refactored how state changes are validated
dabla Jan 7, 2026
60bc489
refactor: Validate events while creating the TriggerStateChanges message
dabla Jan 7, 2026
fc627b0
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 7, 2026
a7ffaaa
refactor: Refactored try/except in validate_state_changes to keep myp…
dabla Jan 7, 2026
0b90b78
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 7, 2026
b29e947
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 8, 2026
3d0e8fb
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 8, 2026
f612bcb
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 9, 2026
f18d8fe
Merge branch 'main' into fix/trigger-crash-serialization-comms
dabla Jan 12, 2026
34e0f34
Update airflow-core/src/airflow/jobs/triggerer_job_runner.py
dabla Jan 13, 2026
2db586d
refactor: Renamed validate_state_changes method to process_trigger_ev…
dabla Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@
UpdateHITLDetail,
VariableResult,
XComResult,
_new_encoder,
_RequestFrame,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.triggers import base as events
from airflow.triggers.base import BaseEventTrigger, BaseTrigger, DiscrimatedTriggerEvent, TriggerEvent
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
Expand All @@ -90,7 +91,6 @@
from airflow.jobs.job import Job
from airflow.sdk.api.client import Client
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.triggers.base import BaseTrigger

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -209,7 +209,7 @@ class TriggerStateChanges(BaseModel):

type: Literal["TriggerStateChanges"] = "TriggerStateChanges"
events: Annotated[
list[tuple[int, events.DiscrimatedTriggerEvent]] | None,
list[tuple[int, DiscrimatedTriggerEvent]] | None,
# We have to specify a default here, as otherwise Pydantic struggles to deal with the discriminated
# union :shrug:
Field(default=None),
Expand Down Expand Up @@ -363,7 +363,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
creating_triggers: deque[workloads.RunTrigger] = attrs.field(factory=deque, init=False)

# Outbound queue of events
events: deque[tuple[int, events.TriggerEvent]] = attrs.field(factory=deque, init=False)
events: deque[tuple[int, TriggerEvent]] = attrs.field(factory=deque, init=False)

# Outbound queue of failed triggers
failed_triggers: deque[tuple[int, list[str] | None]] = attrs.field(factory=deque, init=False)
Expand Down Expand Up @@ -843,7 +843,7 @@ class TriggerRunner:
to_cancel: deque[int]

# Outbound queue of events
events: deque[tuple[int, events.TriggerEvent]]
events: deque[tuple[int, TriggerEvent]]

# Outbound queue of failed triggers
failed_triggers: deque[tuple[int, BaseException | None]]
Expand Down Expand Up @@ -993,7 +993,7 @@ async def create_triggers(self):
"task": asyncio.create_task(
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after), name=trigger_name
),
"is_watcher": isinstance(trigger_instance, events.BaseEventTrigger),
"is_watcher": isinstance(trigger_instance, BaseEventTrigger),
"name": trigger_name,
"events": 0,
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ async def cleanup_finished_triggers(self) -> list[int]:
saved_exc = e
else:
# See if they foolishly returned a TriggerEvent
if isinstance(result, events.TriggerEvent):
if isinstance(result, TriggerEvent):
self.log.error(
"Trigger returned a TriggerEvent rather than yielding it",
trigger=details["name"],
Expand All @@ -1059,46 +1059,64 @@ async def cleanup_finished_triggers(self) -> list[int]:
await asyncio.sleep(0)
return finished_ids

async def sync_state_to_supervisor(self, finished_ids: list[int]):
def process_trigger_events(self, finished_ids: list[int]) -> messages.TriggerStateChanges:
# Copy out of our dequeues in threadsafe manner to sync state with parent
events_to_send = []

req_encoder = _new_encoder()
events_to_send: list[tuple[int, DiscrimatedTriggerEvent]] = []
failures_to_send: list[tuple[int, list[str] | None]] = []

while self.events:
data = self.events.popleft()
events_to_send.append(data)
trigger_id, trigger_event = self.events.popleft()

try:
req_encoder.encode(trigger_event)
except Exception as e:
logger.error(
"Trigger %s returned non-serializable result %r. Cancelling trigger.",
trigger_id,
trigger_event,
)
self.failed_triggers.append((trigger_id, e))
else:
events_to_send.append((trigger_id, trigger_event))


failures_to_send = []
while self.failed_triggers:
id, exc = self.failed_triggers.popleft()
trigger_id, exc = self.failed_triggers.popleft()
tb = format_exception(type(exc), exc, exc.__traceback__) if exc else None
failures_to_send.append((id, tb))
failures_to_send.append((trigger_id, tb))

msg = messages.TriggerStateChanges(
events=events_to_send, finished=finished_ids, failures=failures_to_send
return messages.TriggerStateChanges(
events=events_to_send if events_to_send else None,
finished=finished_ids if finished_ids else None,
failures=failures_to_send if failures_to_send else None,
)

if not events_to_send:
msg.events = None
async def sync_state_to_supervisor(self, finished_ids: list[int]) -> None:
msg = self.process_trigger_events(finished_ids=finished_ids)

if not failures_to_send:
msg.failures = None
# Tell the monitor that we've finished triggers so it can update things
resp = await self.send_trigger_state_changes(msg)

if not finished_ids:
msg.finished = None
if resp:
self.to_create.extend(resp.to_create)
self.to_cancel.extend(resp.to_cancel)

# Tell the monitor that we've finished triggers so it can update things
async def send_trigger_state_changes(self, msg: messages.TriggerStateChanges) -> messages.TriggerStateSync | None:
try:
resp = await self.comms_decoder.asend(msg)
response = await self.comms_decoder.asend(msg)

if not isinstance(response, messages.TriggerStateSync):
raise RuntimeError(f"Expected to get a TriggerStateSync message, instead we got {type(msg)}")

return response
except asyncio.IncompleteReadError:
if task := asyncio.current_task():
task.cancel("EOF - shutting down")
return
return None
raise

if not isinstance(resp, messages.TriggerStateSync):
raise RuntimeError(f"Expected to get a TriggerStateSync message, instead we got {type(msg)}")
self.to_create.extend(resp.to_create)
self.to_cancel.extend(resp.to_cancel)

async def block_watchdog(self):
"""
Watchdog loop that detects blocking (badly-written) triggers.
Expand Down
19 changes: 18 additions & 1 deletion airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ async def test_invalid_trigger(self, supervisor_builder):
trigger_runner = TriggerRunner()
trigger_runner.comms_decoder = AsyncMock(spec=TriggerCommsDecoder)
trigger_runner.comms_decoder.asend.return_value = messages.TriggerStateSync(
to_create=[], to_cancel=[]
to_create=[], to_cancel=set()
)

trigger_runner.to_create.append(workload)
Expand Down Expand Up @@ -436,6 +436,23 @@ async def test_trigger_kwargs_serialization_cleanup(self, session):
trigger_instance.cancel()
await runner.cleanup_finished_triggers()

@pytest.mark.asyncio
@patch("airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True)
async def test_sync_state_to_supervisor(self, supervisor_builder):
trigger_runner = TriggerRunner()
trigger_runner.comms_decoder = AsyncMock(spec=TriggerCommsDecoder)
trigger_runner.comms_decoder.asend.side_effect = [
messages.TriggerStateSync(to_create=[], to_cancel=set()),
]
trigger_runner.events.append((1, TriggerEvent(payload={"status": "SUCCESS"})))
trigger_runner.events.append((2, TriggerEvent(payload={"status": "FAILED"})))
trigger_runner.events.append((3, TriggerEvent(payload={"status": "SUCCESS", "data": object()})))

await trigger_runner.sync_state_to_supervisor(finished_ids=[])

assert trigger_runner.comms_decoder.asend.call_count == 1
assert len(trigger_runner.comms_decoder.asend.call_args_list[0].args[0].events) == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only 2 events? Shouldn't this be 3 as we have two success and one failure to report?



@pytest.mark.asyncio
@pytest.mark.usefixtures("testing_dag_bundle")
Expand Down
Loading