diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 7396fb56c868d..2a41d40690068 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -158,12 +158,13 @@ def _exit_gracefully(self, signum, frame) -> None: self.trigger_runner.stop = True else: self.log.warning("Forcing exit due to second exit signal %s", signum) - - self.trigger_runner.kill(signal.SIGKILL) + if self.trigger_runner: + self.trigger_runner.kill(signal.SIGKILL) sys.exit(os.EX_SOFTWARE) def _execute(self) -> int | None: self.log.info("Starting the triggerer") + self.register_signals() try: # Kick off runner sub-process without DB access self.trigger_runner = TriggerRunnerSupervisor.start( @@ -848,7 +849,6 @@ class TriggerRunner: failed_triggers: deque[tuple[int, BaseException | None]] # Should-we-stop flag - # TODO: set this in a sig-int handler stop: bool = False # TODO: connect this to the parent process @@ -866,8 +866,14 @@ def __init__(self): self.failed_triggers = deque() self.job_id = None + def _handle_signal(self, signum, frame) -> None: + """Handle termination signals gracefully.""" + self.stop = True + def run(self): - """Sync entrypoint - just run a run in an async loop.""" + """Sync entrypoint - just run arun in an async loop.""" + signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGTERM, self._handle_signal) asyncio.run(self.arun()) async def arun(self):