diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 162f103642874..3d2c20c61253d 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -588,61 +588,83 @@ def _per_task_process(key, ti: TaskInstance, session): try: for task in self.dag.topological_sort(include_subdag_tasks=True): for key, ti in list(ti_status.to_run.items()): - if task.task_id != ti.task_id: - continue - - pool = session.scalar( - select(models.Pool).where(models.Pool.pool == task.pool).limit(1) - ) - if not pool: - raise PoolNotFound(f"Unknown pool: {task.pool}") - - open_slots = pool.open_slots(session=session) - if open_slots <= 0: - raise NoAvailablePoolSlot( - f"Not scheduling since there are {open_slots} open slots in pool {task.pool}" - ) - - num_running_task_instances_in_dag = DAG.get_num_task_instances( - self.dag_id, - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) - - if num_running_task_instances_in_dag >= self.dag.max_active_tasks: - raise DagConcurrencyLimitReached( - "Not scheduling since DAG max_active_tasks limit is reached." + # Attempt to workaround deadlock on backfill by attempting to commit the transaction + # state update few times before giving up + max_attempts = 5 + for i in range(max_attempts): + if task.task_id != ti.task_id: + continue + + pool = session.scalar( + select(models.Pool).where(models.Pool.pool == task.pool).limit(1) ) + if not pool: + raise PoolNotFound(f"Unknown pool: {task.pool}") + + open_slots = pool.open_slots(session=session) + if open_slots <= 0: + raise NoAvailablePoolSlot( + f"Not scheduling since there are {open_slots} " + f"open slots in pool {task.pool}" + ) - if task.max_active_tis_per_dag is not None: - num_running_task_instances_in_task = DAG.get_num_task_instances( - dag_id=self.dag_id, - task_ids=[task.task_id], + num_running_task_instances_in_dag = DAG.get_num_task_instances( + self.dag_id, states=self.STATES_COUNT_AS_RUNNING, session=session, ) - if num_running_task_instances_in_task >= task.max_active_tis_per_dag: - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency limit is reached." + if num_running_task_instances_in_dag >= self.dag.max_active_tasks: + raise DagConcurrencyLimitReached( + "Not scheduling since DAG max_active_tasks limit is reached." ) - if task.max_active_tis_per_dagrun is not None: - num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances( - dag_id=self.dag_id, - run_id=ti.run_id, - task_ids=[task.task_id], - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) + if task.max_active_tis_per_dag is not None: + num_running_task_instances_in_task = DAG.get_num_task_instances( + dag_id=self.dag_id, + task_ids=[task.task_id], + states=self.STATES_COUNT_AS_RUNNING, + session=session, + ) - if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun: - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency per DAG run limit is reached." + if num_running_task_instances_in_task >= task.max_active_tis_per_dag: + raise TaskConcurrencyLimitReached( + "Not scheduling since Task concurrency limit is reached." + ) + + if task.max_active_tis_per_dagrun is not None: + num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances( + dag_id=self.dag_id, + run_id=ti.run_id, + task_ids=[task.task_id], + states=self.STATES_COUNT_AS_RUNNING, + session=session, ) - _per_task_process(key, ti, session) - session.commit() + if ( + num_running_task_instances_in_task_dagrun + >= task.max_active_tis_per_dagrun + ): + raise TaskConcurrencyLimitReached( + "Not scheduling since Task concurrency per DAG run limit is reached." + ) + + _per_task_process(key, ti, session) + try: + session.commit() + # break the retry loop + break + except OperationalError: + self.log.error( + "Failed to commit task state due to operational error. " + "The job will retry this operation so if your backfill succeeds, " + "you can safely ignore this message.", + exc_info=True, + ) + session.rollback() + if i == max_attempts - 1: + raise + # retry the loop except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e: self.log.debug(e) @@ -939,6 +961,13 @@ def _execute(self, session: Session = NEW_SESSION) -> None: # TODO: we will need to terminate running task instances and set the # state to failed. self._set_unfinished_dag_runs_to_failed(ti_status.active_runs) + except OperationalError: + self.log.error( + "Backfill job dead-locked. The job will retry the job so it is likely " + "to heal itself. If your backfill succeeds you can ignore this exception.", + exc_info=True, + ) + raise finally: session.commit() executor.end() diff --git a/tests/providers/daskexecutor/test_dask_executor.py b/tests/providers/daskexecutor/test_dask_executor.py index 2d559eaa404ef..c5773bd83d2a0 100644 --- a/tests/providers/daskexecutor/test_dask_executor.py +++ b/tests/providers/daskexecutor/test_dask_executor.py @@ -104,7 +104,6 @@ def test_dask_executor_functions(self): # This test is quarantined because it became rather flaky on our CI in July 2023 and reason for this # is unknown. An issue for that was created: https://github.com/apache/airflow/issues/32778 and the # marker should be removed while (possibly) the reason for flaky behaviour is found and fixed. - @pytest.mark.quarantined @pytest.mark.execution_timeout(180) def test_backfill_integration(self): """