Skip to content

Commit

Permalink
Ensure that main triggerer thread exits if the async thread fails (#3…
Browse files Browse the repository at this point in the history
…2092)

Co-authored-by: tom.rutter <tom.rutter@zurich.com>
  • Loading branch information
tomrutter and tom.rutter authored Jun 27, 2023
1 parent 14785bc commit e585b58
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
29 changes: 18 additions & 11 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ def _run_trigger_loop(self) -> None:
This runs synchronously and handles all database reads/writes.
"""
while not self.trigger_runner.stop:
if not self.trigger_runner.is_alive():
self.log.error("Trigger runner thread has died! Exiting.")
break
# Clean out unused triggers
Trigger.clean_unused()
# Load/delete triggers
Expand Down Expand Up @@ -470,17 +473,21 @@ async def arun(self):
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
# Run core logic
await self.create_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
# Every minute, log status
if time.time() - last_status >= 60:
count = len(self.triggers)
self.log.info("%i triggers currently running", count)
last_status = time.time()
try:
# Run core logic
await self.create_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
# Every minute, log status
if time.time() - last_status >= 60:
count = len(self.triggers)
self.log.info("%i triggers currently running", count)
last_status = time.time()
except Exception:
self.stop = True
raise
# Wait for watchdog to complete
await watchdog

Expand Down
43 changes: 43 additions & 0 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,49 @@ def test_trigger_from_expired_triggerer(session):
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]


def test_trigger_runner_exception_stops_triggerer(session):
"""
Checks that if an exception occurs when creating triggers, that the triggerer
process stops
"""

class MockTriggerException(Exception):
pass

class TriggerRunner_(TriggerRunner):
async def create_triggers(self):
raise MockTriggerException("Trigger creation failed")

# Use a trigger that will immediately succeed
trigger = SuccessTrigger()
create_trigger_in_db(session, trigger)

# Make a TriggererJobRunner and have it retrieve DB tasks
job = Job()
job_runner = TriggererJobRunner(job)
job_runner.trigger_runner = TriggerRunner_()
thread = Thread(target=job_runner._execute)
thread.start()

# Wait 4 seconds for the triggerer to stop
try:
for _ in range(40):
time.sleep(0.1)
if not thread.is_alive():
break
else:
pytest.fail("TriggererJobRunner did not stop after exception in TriggerRunner")

if not job_runner.trigger_runner.stop:
pytest.fail("TriggerRunner not marked as stopped after exception in TriggerRunner")

finally:
job_runner.trigger_runner.stop = True
# with suppress(MockTriggerException):
job_runner.trigger_runner.join()
thread.join()


def test_trigger_firing(session):
"""
Checks that when a trigger fires, it correctly makes it into the
Expand Down

0 comments on commit e585b58

Please sign in to comment.