Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partially updated trigger/task_instance data in triggerr jobs #32092

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ def _run_trigger_loop(self) -> None:
This runs synchronously and handles all database reads/writes.
"""
while not self.trigger_runner.stop:
if not self.trigger_runner.is_alive():
self.log.error("Trigger runner thread has died! Exiting.")
break
# Clean out unused triggers
Trigger.clean_unused()
# Load/delete triggers
Expand Down Expand Up @@ -467,17 +470,21 @@ async def arun(self):
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
# Run core logic
await self.create_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
# Every minute, log status
if time.time() - last_status >= 60:
count = len(self.triggers)
self.log.info("%i triggers currently running", count)
last_status = time.time()
try:
# Run core logic
await self.create_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
# Every minute, log status
if time.time() - last_status >= 60:
count = len(self.triggers)
self.log.info("%i triggers currently running", count)
last_status = time.time()
except Exception:
self.stop = True
raise
# Wait for watchdog to complete
await watchdog

Expand Down
43 changes: 43 additions & 0 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,49 @@ def test_trigger_from_expired_triggerer(session):
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]


def test_trigger_runner_exception_stops_triggerer(session):
"""
Checks that if an exception occurs when creating triggers, that the triggerer
process stops
"""

class MockTriggerException(Exception):
pass

class TriggerRunner_(TriggerRunner):
async def create_triggers(self):
raise MockTriggerException("Trigger creation failed")

# Use a trigger that will immediately succeed
trigger = SuccessTrigger()
create_trigger_in_db(session, trigger)

# Make a TriggererJobRunner and have it retrieve DB tasks
job = Job()
job_runner = TriggererJobRunner(job)
job_runner.trigger_runner = TriggerRunner_()
thread = Thread(target=job_runner._execute)
thread.start()

# Wait 4 seconds for the triggerer to stop
try:
for _ in range(40):
time.sleep(0.1)
if not thread.is_alive():
break
else:
pytest.fail("TriggererJobRunner did not stop after exception in TriggerRunner")

if not job_runner.trigger_runner.stop:
pytest.fail("TriggerRunner not marked as stopped after exception in TriggerRunner")

finally:
job_runner.trigger_runner.stop = True
# with suppress(MockTriggerException):
job_runner.trigger_runner.join()
thread.join()


def test_trigger_firing(session):
"""
Checks that when a trigger fires, it correctly makes it into the
Expand Down