diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 951602e14e648..7958adb64cf96 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1974,20 +1974,16 @@ def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]: self.log.debug("Finding 'running' jobs without a recent heartbeat") limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs) - zombies = ( - session.execute( - select(TI, DM.fileloc, DM.processor_subdir) - .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") - .join(Job, TI.job_id == Job.id) - .join(DM, TI.dag_id == DM.dag_id) - .where(TI.state == TaskInstanceState.RUNNING) - .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm)) - .where(Job.job_type == "LocalTaskJob") - .where(TI.queued_by_job_id == self.job.id) - ) - .unique() - .all() - ) + zombies = session.execute( + select(TI, DM.fileloc, DM.processor_subdir) + .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") + .join(Job, TI.job_id == Job.id) + .join(DM, TI.dag_id == DM.dag_id) + .where(TI.state == TaskInstanceState.RUNNING) + .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm)) + .where(Job.job_type == "LocalTaskJob") + .where(TI.queued_by_job_id == self.job.id) + ).all() if zombies: self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm) return zombies