diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f736fd3b37aa7..0536fe2293ad2 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -284,15 +284,27 @@ def update_state(self, session=None): none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks) none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks) - # small speed up - if unfinished_tasks and none_depends_on_past and none_task_concurrency: + if unfinished_tasks: scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES] + if none_depends_on_past and none_task_concurrency: + # small speed up + self.log.debug( + "number of scheduleable tasks for %s: %s task(s)", + self, len(scheduleable_tasks)) + ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) + self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis)) + are_runnable_tasks = ready_tis or self._are_premature_tis( + unfinished_tasks, finished_tasks, session) or changed_tis + else: + # slow path + for ti in scheduleable_tasks: + if ti.are_dependencies_met( + dep_context=DepContext(flag_upstream_failed=True), + session=session + ): + self.log.debug('Queuing task: %s', ti) + ready_tis.append(ti) - self.log.debug("number of scheduleable tasks for %s: %s task(s)", self, len(scheduleable_tasks)) - ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) - self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis)) - are_runnable_tasks = ready_tis or self._are_premature_tis( - unfinished_tasks, finished_tasks, session) or changed_tis duration = (timezone.utcnow() - start_dttm) Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index c8ab3bbe5eb62..d05b4a0aa643b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -469,6 +469,104 @@ def test_dag_file_processor_process_task_instances(self, state, start_date, end_ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER) ) + @parameterized.expand([ + [State.NONE, None, None], + [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + ]) + def test_dag_file_processor_process_task_instances_with_task_concurrency( + self, state, start_date, end_date, + ): + """ + Test if _process_task_instances puts the right task instances into the + mock_list. + """ + dag = DAG( + dag_id='test_scheduler_process_execute_task_with_task_concurrency', + start_date=DEFAULT_DATE) + dag_task1 = DummyOperator( + task_id='dummy', + task_concurrency=2, + dag=dag, + owner='airflow') + + with create_session() as session: + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag.clear() + dr = dag_file_processor.create_dag_run(dag) + self.assertIsNotNone(dr) + + with create_session() as session: + tis = dr.get_task_instances(session=session) + for ti in tis: + ti.state = state + ti.start_date = start_date + ti.end_date = end_date + + ti_to_schedule = [] + dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) + + assert ti_to_schedule == [ + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), + ] + + @parameterized.expand([ + [State.NONE, None, None], + [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15)], + ]) + def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): + """ + Test if _process_task_instances puts the right task instances into the + mock_list. + """ + dag = DAG( + dag_id='test_scheduler_process_execute_task_depends_on_past', + start_date=DEFAULT_DATE, + default_args={ + 'depends_on_past': True, + }, + ) + dag_task1 = DummyOperator( + task_id='dummy1', + dag=dag, + owner='airflow') + dag_task2 = DummyOperator( + task_id='dummy2', + dag=dag, + owner='airflow') + + with create_session() as session: + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag.clear() + dr = dag_file_processor.create_dag_run(dag) + self.assertIsNotNone(dr) + + with create_session() as session: + tis = dr.get_task_instances(session=session) + for ti in tis: + ti.state = state + ti.start_date = start_date + ti.end_date = end_date + + ti_to_schedule = [] + dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) + + assert ti_to_schedule == [ + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), + (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), + ] + def test_dag_file_processor_do_not_schedule_removed_task(self): dag = DAG( dag_id='test_scheduler_do_not_schedule_removed_task',