Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,22 +449,26 @@ def update_dags(

# These "is not None" checks are because a LazySerializedDag object does not
# provide the default value if the user doesn't provide an explicit value.
if dag.max_active_tasks is not None:
dm.max_active_tasks = dag.max_active_tasks
elif dag.max_active_tasks is None and dm.max_active_tasks is None:

# if dag.max_active_tasks come as None then default max_active_tasks should be updated
# similar for max_consecutive_failed_dag_runs, max_active_runs

if dag.max_active_tasks is None:
dm.max_active_tasks = conf.getint("core", "max_active_tasks_per_dag")
else:
dm.max_active_tasks = dag.max_active_tasks

if dag.max_active_runs is not None:
dm.max_active_runs = dag.max_active_runs
elif dag.max_active_runs is None and dm.max_active_runs is None:
if dag.max_active_runs is None:
dm.max_active_runs = conf.getint("core", "max_active_runs_per_dag")
else:
dm.max_active_runs = dag.max_active_runs

if dag.max_consecutive_failed_dag_runs is not None:
dm.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs
elif dag.max_consecutive_failed_dag_runs is None and dm.max_consecutive_failed_dag_runs is None:
if dag.max_consecutive_failed_dag_runs is None:
dm.max_consecutive_failed_dag_runs = conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)
else:
dm.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs

if hasattr(dag, "has_task_concurrency_limits"):
dm.has_task_concurrency_limits = dag.has_task_concurrency_limits
Expand Down
57 changes: 57 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airflow.utils import timezone as tz
from airflow.utils.session import create_session

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_dags,
Expand Down Expand Up @@ -814,3 +815,59 @@ def test_bundle_name_and_version_are_stored(self, testing_dag_bundle, session, d
orm_dag = session.get(DagModel, "mydag")
assert orm_dag.bundle_name == "testing"
assert orm_dag.bundle_version == "1.0"

def test_max_active_tasks_explicit_value_is_used(self, testing_dag_bundle, session, dag_maker):
with dag_maker("dag_max_tasks", schedule=None, max_active_tasks=5) as dag:
...
update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session)
orm_dag = session.get(DagModel, "dag_max_tasks")
assert orm_dag.max_active_tasks == 5

def test_max_active_tasks_defaults_from_conf_when_none(self, testing_dag_bundle, session, dag_maker):
# Override config so that when DAG.max_active_tasks is None, DagModel gets the configured default
with conf_vars({("core", "max_active_tasks_per_dag"): "7"}):
with dag_maker("dag_max_tasks_default", schedule=None) as dag:
...
update_dag_parsing_results_in_db(
"testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session
)
orm_dag = session.get(DagModel, "dag_max_tasks_default")
assert orm_dag.max_active_tasks == 7

def test_max_active_runs_explicit_value_is_used(self, testing_dag_bundle, session, dag_maker):
with dag_maker("dag_max_runs", schedule=None, max_active_runs=3) as dag:
...
update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session)
orm_dag = session.get(DagModel, "dag_max_runs")
assert orm_dag.max_active_runs == 3

def test_max_active_runs_defaults_from_conf_when_none(self, testing_dag_bundle, session, dag_maker):
with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
with dag_maker("dag_max_runs_default", schedule=None) as dag:
...
update_dag_parsing_results_in_db(
"testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session
)
orm_dag = session.get(DagModel, "dag_max_runs_default")
assert orm_dag.max_active_runs == 4

def test_max_consecutive_failed_dag_runs_explicit_value_is_used(
self, testing_dag_bundle, session, dag_maker
):
with dag_maker("dag_max_failed_runs", schedule=None, max_consecutive_failed_dag_runs=2) as dag:
...
update_dag_parsing_results_in_db("testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session)
orm_dag = session.get(DagModel, "dag_max_failed_runs")
assert orm_dag.max_consecutive_failed_dag_runs == 2

def test_max_consecutive_failed_dag_runs_defaults_from_conf_when_none(
self, testing_dag_bundle, session, dag_maker
):
with conf_vars({("core", "max_consecutive_failed_dag_runs_per_dag"): "6"}):
with dag_maker("dag_max_failed_runs_default", schedule=None) as dag:
...
update_dag_parsing_results_in_db(
"testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), session
)
orm_dag = session.get(DagModel, "dag_max_failed_runs_default")
assert orm_dag.max_consecutive_failed_dag_runs == 6
Loading