diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 305e5bcbdd342..599f6dd81c06b 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -533,6 +533,13 @@ def __attrs_post_init__(self): RemovedInAirflow4Warning, stacklevel=2, ) + if ( + active_runs_limit := self.timetable.active_runs_limit + ) is not None and active_runs_limit < self.max_active_runs: + raise ValueError( + f"Invalid max_active_runs: {type(self.timetable)} " + f"requires max_active_runs <= {active_runs_limit}" + ) @params.validator def _validate_params(self, _, params: ParamsDict): diff --git a/task-sdk/tests/task_sdk/definitions/test_dag.py b/task-sdk/tests/task_sdk/definitions/test_dag.py index f61475adfa9b8..6e6ef551d07ab 100644 --- a/task-sdk/tests/task_sdk/definitions/test_dag.py +++ b/task-sdk/tests/task_sdk/definitions/test_dag.py @@ -470,6 +470,21 @@ def test_create_dag_while_active_context(): # No asserts needed, it just needs to not fail +@pytest.mark.parametrize("max_active_runs", [0, 1]) +def test_continuous_schedule_interval_limits_max_active_runs(max_active_runs): + from airflow.timetables.simple import ContinuousTimetable + + dag = DAG(dag_id="continuous", schedule="@continuous", max_active_runs=max_active_runs) + assert isinstance(dag.timetable, ContinuousTimetable) + assert dag.max_active_runs == max_active_runs + + +def test_continuous_schedule_interval_limits_max_active_runs_error(): + with pytest.raises(ValueError) as ctx: + DAG(dag_id="continuous", schedule="@continuous", max_active_runs=2) + assert str(ctx.value) == "Invalid max_active_runs: ContinuousTimetable requires max_active_runs <= 1" + + class TestDagDecorator: DEFAULT_ARGS = { "owner": "test",