diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index e53889b945d83..34592e015141d 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -449,22 +449,26 @@ def update_dags( # These "is not None" checks are because a LazySerializedDag object does not # provide the default value if the user doesn't provide an explicit value. - if dag.max_active_tasks is not None: - dm.max_active_tasks = dag.max_active_tasks - elif dag.max_active_tasks is None and dm.max_active_tasks is None: + + # if dag.max_active_tasks come as None then default max_active_tasks should be updated + # similar for max_consecutive_failed_dag_runs, max_active_runs + + if dag.max_active_tasks is None: dm.max_active_tasks = conf.getint("core", "max_active_tasks_per_dag") + else: + dm.max_active_tasks = dag.max_active_tasks - if dag.max_active_runs is not None: - dm.max_active_runs = dag.max_active_runs - elif dag.max_active_runs is None and dm.max_active_runs is None: + if dag.max_active_runs is None: dm.max_active_runs = conf.getint("core", "max_active_runs_per_dag") + else: + dm.max_active_runs = dag.max_active_runs - if dag.max_consecutive_failed_dag_runs is not None: - dm.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs - elif dag.max_consecutive_failed_dag_runs is None and dm.max_consecutive_failed_dag_runs is None: + if dag.max_consecutive_failed_dag_runs is None: dm.max_consecutive_failed_dag_runs = conf.getint( "core", "max_consecutive_failed_dag_runs_per_dag" ) + else: + dm.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs if hasattr(dag, "has_task_concurrency_limits"): dm.has_task_concurrency_limits = dag.has_task_concurrency_limits diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index 5e353f0dddc06..9d9bf272ad58a 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -57,6 +57,7 @@ from airflow.utils import timezone as tz from airflow.utils.session import create_session +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import ( clear_db_assets, clear_db_dags, @@ -814,3 +815,59 @@ def test_bundle_name_and_version_are_stored(self, testing_dag_bundle, session, d orm_dag = session.get(DagModel, "mydag") assert orm_dag.bundle_name == "testing" assert orm_dag.bundle_version == "1.0" + + def test_max_active_tasks_explicit_value_is_used(self, testing_dag_bundle, session, dag_maker): + with dag_maker("dag_max_tasks", schedule=None, max_active_tasks=5) as dag: + ... + update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session) + orm_dag = session.get(DagModel, "dag_max_tasks") + assert orm_dag.max_active_tasks == 5 + + def test_max_active_tasks_defaults_from_conf_when_none(self, testing_dag_bundle, session, dag_maker): + # Override config so that when DAG.max_active_tasks is None, DagModel gets the configured default + with conf_vars({("core", "max_active_tasks_per_dag"): "7"}): + with dag_maker("dag_max_tasks_default", schedule=None) as dag: + ... + update_dag_parsing_results_in_db( + "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session + ) + orm_dag = session.get(DagModel, "dag_max_tasks_default") + assert orm_dag.max_active_tasks == 7 + + def test_max_active_runs_explicit_value_is_used(self, testing_dag_bundle, session, dag_maker): + with dag_maker("dag_max_runs", schedule=None, max_active_runs=3) as dag: + ... + update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session) + orm_dag = session.get(DagModel, "dag_max_runs") + assert orm_dag.max_active_runs == 3 + + def test_max_active_runs_defaults_from_conf_when_none(self, testing_dag_bundle, session, dag_maker): + with conf_vars({("core", "max_active_runs_per_dag"): "4"}): + with dag_maker("dag_max_runs_default", schedule=None) as dag: + ... + update_dag_parsing_results_in_db( + "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session + ) + orm_dag = session.get(DagModel, "dag_max_runs_default") + assert orm_dag.max_active_runs == 4 + + def test_max_consecutive_failed_dag_runs_explicit_value_is_used( + self, testing_dag_bundle, session, dag_maker + ): + with dag_maker("dag_max_failed_runs", schedule=None, max_consecutive_failed_dag_runs=2) as dag: + ... + update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session) + orm_dag = session.get(DagModel, "dag_max_failed_runs") + assert orm_dag.max_consecutive_failed_dag_runs == 2 + + def test_max_consecutive_failed_dag_runs_defaults_from_conf_when_none( + self, testing_dag_bundle, session, dag_maker + ): + with conf_vars({("core", "max_consecutive_failed_dag_runs_per_dag"): "6"}): + with dag_maker("dag_max_failed_runs_default", schedule=None) as dag: + ... + update_dag_parsing_results_in_db( + "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session + ) + orm_dag = session.get(DagModel, "dag_max_failed_runs_default") + assert orm_dag.max_consecutive_failed_dag_runs == 6