diff --git a/airflow/jobs.py b/airflow/jobs.py index 554f07e715043..0b006448c90c6 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -761,16 +761,18 @@ def _execute(self): self.logger.info("Starting {} scheduler jobs".format(len(jobs))) for j in jobs: j.start() + + while any(j.is_alive() for j in jobs): + while not tis_q.empty(): + ti_key, pickle_id = tis_q.get() + dag = dagbag.dags[ti_key[0]] + task = dag.get_task(ti_key[1]) + ti = TI(task, ti_key[2]) + self.executor.queue_task_instance(ti, pickle_id=pickle_id) + for j in jobs: j.join() - while not tis_q.empty(): - ti_key, pickle_id = tis_q.get() - dag = dagbag.dags[ti_key[0]] - task = dag.get_task(ti_key[1]) - ti = TI(task, ti_key[2]) - self.executor.queue_task_instance(ti, pickle_id=pickle_id) - self.logger.info("Done queuing tasks, calling the executor's " "heartbeat") duration_sec = (datetime.now() - loop_start_dttm).total_seconds()