diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py
index d04a790c02f88..08ee1726ccc22 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -102,6 +102,7 @@ def _trigger_dag(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 374a0205c56d7..8243a30d54d24 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -354,6 +354,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 9f5bc1a5d484d..80db78d87f512 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -377,6 +377,7 @@ def trigger_dag_run(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 675a76d431392..c3ec79c9ddd11 100644
--- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -219,6 +219,7 @@ class DagRun(StrictBaseModel):
logical_date: UtcDateTime
data_interval_start: UtcDateTime | None
data_interval_end: UtcDateTime | None
+ run_after: UtcDateTime
start_date: UtcDateTime
end_date: UtcDateTime | None
run_type: DagRunType
diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py
index 155f96f861ab5..3cc70dac9c5b5 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -144,6 +144,7 @@ def ti_run(
DR.dag_id,
DR.data_interval_start,
DR.data_interval_end,
+ DR.run_after,
DR.start_date,
DR.end_date,
DR.run_type,
diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py
index 011f629d13f56..ccae29ededd09 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -171,22 +171,26 @@ def _get_dag_run(
dag_run_logical_date = pendulum.instance(timezone.utcnow())
if create_if_necessary == "memory":
+ data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = DagRun(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
- data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
+ data_interval=data_interval,
+ run_after=data_interval.end,
triggered_by=DagRunTriggeredByType.CLI,
state=DagRunState.RUNNING,
)
return dag_run, True
elif create_if_necessary == "db":
+ data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = dag.create_dagrun(
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
- data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
+ data_interval=data_interval,
+ run_after=data_interval.end,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index dd7c6f1f65550..1e98dbf2aba29 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1284,6 +1284,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
),
logical_date=dag_model.next_dagrun,
data_interval=data_interval,
+ run_after=dag_model.next_dagrun_create_after,
run_type=DagRunType.SCHEDULED,
triggered_by=DagRunTriggeredByType.TIMETABLE,
dag_version=latest_dag_version,
@@ -1395,6 +1396,7 @@ def _create_dag_runs_asset_triggered(
),
logical_date=logical_date,
data_interval=data_interval,
+ run_after=max(logical_dates.values()),
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
dag_version=latest_dag_version,
diff --git a/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py b/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py
new file mode 100644
index 0000000000000..7a0a7e59c7d74
--- /dev/null
+++ b/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add DagRun run_after.
+
+Revision ID: 6a9e7a527a88
+Revises: 33b04e4bfa19
+Create Date: 2025-01-16 10:07:34.940948
+"""
+
+from __future__ import annotations
+
+import alembic.op as op
+import sqlalchemy as sa
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# Revision identifiers, used by Alembic.
+revision = "6a9e7a527a88"
+down_revision = "33b04e4bfa19"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+ """Add DagRun run_after."""
+ op.add_column("dag_run", sa.Column("run_after", UtcDateTime, nullable=True))
+
+ # run_after must be set to after the data interval ends.
+ # In very old runs where the data interval is not available, we'll just use
+ # the logical date. This is wrong (one interval too early), but good enough
+ # since those super old runs should have their data interval ended a long
+ # time ago anyway, and setting
+ op.execute("update dag_run set run_after = coalesce(data_interval_end, logical_date)")
+
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.alter_column("run_after", existing_type=UtcDateTime, nullable=False)
+ batch_op.create_index("idx_dag_run_run_after", ["run_after"], unique=False)
+
+
+def downgrade():
+ """Remove DagRun run_after."""
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.drop_index("idx_dag_run_run_after")
+ batch_op.drop_column("run_after")
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 1564c4feb59a5..a2347e76fdc6e 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -296,6 +296,7 @@ def _create_backfill_dag_run(
),
logical_date=info.logical_date,
data_interval=info.data_interval,
+ run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index e84994c6d04e7..425cbaca68880 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -632,6 +632,7 @@ def run(
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
data_interval=info.data_interval,
+ run_after=info.run_after,
triggered_by=DagRunTriggeredByType.TEST,
state=DagRunState.RUNNING,
)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a0f6e901dc3e9..8fae1604ed761 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -250,6 +250,7 @@ def _create_orm_dagrun(
run_id: str,
logical_date: datetime | None,
data_interval: DataInterval | None,
+ run_after: datetime,
start_date: datetime | None,
external_trigger: bool,
conf: Any,
@@ -266,6 +267,7 @@ def _create_orm_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=start_date,
+ run_after=run_after,
external_trigger=external_trigger,
conf=conf,
state=state,
@@ -1634,11 +1636,12 @@ def add_logger_if_needed(ti: TaskInstance):
dag=scheduler_dag,
start_date=logical_date,
logical_date=logical_date,
+ data_interval=data_interval,
+ run_after=data_interval.end,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
session=session,
conf=run_conf,
triggered_by=DagRunTriggeredByType.TEST,
- data_interval=data_interval,
)
tasks = self.task_dict
@@ -1721,6 +1724,7 @@ def create_dagrun(
run_id: str,
logical_date: datetime,
data_interval: tuple[datetime, datetime],
+ run_after: datetime,
conf: dict | None = None,
run_type: DagRunType,
triggered_by: DagRunTriggeredByType,
@@ -1791,6 +1795,8 @@ def create_dagrun(
dag=self,
run_id=run_id,
logical_date=logical_date,
+ data_interval=data_interval,
+ run_after=timezone.coerce_datetime(run_after),
start_date=timezone.coerce_datetime(start_date),
external_trigger=external_trigger,
conf=conf,
@@ -1799,7 +1805,6 @@ def create_dagrun(
dag_version=dag_version,
creating_job_id=creating_job_id,
backfill_id=backfill_id,
- data_interval=data_interval,
triggered_by=triggered_by,
session=session,
)
@@ -2439,6 +2444,7 @@ def _get_or_create_dagrun(
run_id: str,
logical_date: datetime,
data_interval: tuple[datetime, datetime],
+ run_after: datetime,
conf: dict | None,
triggered_by: DagRunTriggeredByType,
start_date: datetime,
@@ -2468,6 +2474,7 @@ def _get_or_create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=run_after,
conf=conf,
run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 727746b9b0333..22757c972e953 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -107,6 +107,11 @@ class TISchedulingDecision(NamedTuple):
finished_tis: list[TI]
+def _default_run_after(ctx):
+ params = ctx.get_current_parameters()
+ return params["data_interval_end"] or params["logical_date"] or timezone.utcnow()
+
+
def _creator_note(val):
"""Creator the ``note`` association proxy."""
if isinstance(val, str):
@@ -145,6 +150,8 @@ class DagRun(Base, LoggingMixin):
# These two must be either both NULL or both datetime.
data_interval_start = Column(UtcDateTime)
data_interval_end = Column(UtcDateTime)
+ # Earliest time when this DagRun can start running.
+ run_after = Column(UtcDateTime, default=_default_run_after, nullable=False)
# When a scheduler last attempted to schedule TIs for this DagRun
last_scheduling_decision = Column(UtcDateTime)
# Foreign key to LogTemplate. DagRun rows created prior to this column's
@@ -180,6 +187,7 @@ class DagRun(Base, LoggingMixin):
Index("dag_id_state", dag_id, _state),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
Index("idx_dag_run_dag_id", dag_id),
+ Index("idx_dag_run_run_after", run_after),
Index(
"idx_dag_run_running_dags",
"state",
@@ -229,8 +237,10 @@ def __init__(
self,
dag_id: str | None = None,
run_id: str | None = None,
+ *,
queued_at: datetime | None | ArgNotSet = NOTSET,
logical_date: datetime | None = None,
+ run_after: datetime | None = None,
start_date: datetime | None = None,
external_trigger: bool | None = None,
conf: Any | None = None,
@@ -253,6 +263,7 @@ def __init__(
self.dag_id = dag_id
self.run_id = run_id
self.logical_date = logical_date
+ self.run_after = run_after
self.start_date = start_date
self.external_trigger = external_trigger
self.conf = conf or {}
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 0dc4dbb1b61f4..0c518e6108e5f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
- "3.0.0": "33b04e4bfa19",
+ "3.0.0": "6a9e7a527a88",
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 88e504c75822f..4ce5c6564619f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2237,6 +2237,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.UI,
diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py
index 558b4eba4980c..4e056b3faf1e9 100755
--- a/dev/perf/scheduler_dag_execution_timing.py
+++ b/dev/perf/scheduler_dag_execution_timing.py
@@ -173,6 +173,7 @@ def create_dag_runs(dag, num_runs, session):
run_id=f"{id_prefix}{logical_date.isoformat()}",
logical_date=logical_date,
data_interval=(logical_date, logical_date),
+ run_after=logical_date,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.TEST,
external_trigger=False,
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index f2bf38552164b..e3606c3476353 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-0f041321ecbb5d059f7a9c85fd5cfba745732eea205dc2e4f5e3b6dd76a9468d
\ No newline at end of file
+232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index d20fa37b8ea66..7efa5c1512485 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -649,24 +649,24 @@
dagrun_asset_event
-
-dagrun_asset_event
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-event_id
-
- [INTEGER]
- NOT NULL
+
+dagrun_asset_event
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+event_id
+
+ [INTEGER]
+ NOT NULL
asset_event--dagrun_asset_event
-
-0..N
+
+0..N
1
@@ -709,695 +709,695 @@
task_instance
-
-task_instance
-
-id
-
- [UUID]
- NOT NULL
-
-custom_operator_name
-
- [VARCHAR(1000)]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-duration
-
- [DOUBLE_PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
-
-external_executor_id
-
- [VARCHAR(250)]
-
-hostname
-
- [VARCHAR(1000)]
-
-last_heartbeat_at
-
- [TIMESTAMP]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
-
-next_kwargs
-
- [JSON]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
-
-queue
-
- [VARCHAR(256)]
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-scheduled_dttm
-
- [TIMESTAMP]
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
-
-unixname
-
- [VARCHAR(1000)]
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance
+
+id
+
+ [UUID]
+ NOT NULL
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+duration
+
+ [DOUBLE_PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+
+external_executor_id
+
+ [VARCHAR(250)]
+
+hostname
+
+ [VARCHAR(1000)]
+
+last_heartbeat_at
+
+ [TIMESTAMP]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+
+next_kwargs
+
+ [JSON]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+
+queue
+
+ [VARCHAR(256)]
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+scheduled_dttm
+
+ [TIMESTAMP]
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+
+unixname
+
+ [VARCHAR(1000)]
+
+updated_at
+
+ [TIMESTAMP]
trigger--task_instance
-
-0..N
+
+0..N
{0,1}
task_reschedule
-
-task_reschedule
-
-id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-duration
-
- [INTEGER]
- NOT NULL
-
-end_date
-
- [TIMESTAMP]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-reschedule_date
-
- [TIMESTAMP]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-try_number
-
- [INTEGER]
- NOT NULL
+
+task_reschedule
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+duration
+
+ [INTEGER]
+ NOT NULL
+
+end_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+reschedule_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+try_number
+
+ [INTEGER]
+ NOT NULL
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
task_instance--task_reschedule
-
-0..N
-1
+
+0..N
+1
rendered_task_instance_fields
-
-rendered_task_instance_fields
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-k8s_pod_yaml
-
- [JSON]
-
-rendered_fields
-
- [JSON]
- NOT NULL
+
+rendered_task_instance_fields
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+k8s_pod_yaml
+
+ [JSON]
+
+rendered_fields
+
+ [JSON]
+ NOT NULL
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_instance--rendered_task_instance_fields
-
-0..N
-1
+
+0..N
+1
task_map
-
-task_map
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-keys
-
- [JSON]
-
-length
-
- [INTEGER]
- NOT NULL
+
+task_map
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+keys
+
+ [JSON]
+
+length
+
+ [INTEGER]
+ NOT NULL
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
task_instance--task_map
-
-0..N
-1
+
+0..N
+1
xcom
-
-xcom
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-key
-
- [VARCHAR(512)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-timestamp
-
- [TIMESTAMP]
- NOT NULL
-
-value
-
- [JSONB]
+
+xcom
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+key
+
+ [VARCHAR(512)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+timestamp
+
+ [TIMESTAMP]
+ NOT NULL
+
+value
+
+ [JSONB]
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance--xcom
-
-0..N
-1
+
+0..N
+1
task_instance_note
-
-task_instance_note
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-content
-
- [VARCHAR(1000)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
-
-user_id
-
- [VARCHAR(128)]
+
+task_instance_note
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+content
+
+ [VARCHAR(1000)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+
+ [VARCHAR(128)]
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_note
-
-0..N
-1
+
+0..N
+1
task_instance_history
-
-task_instance_history
-
-id
-
- [INTEGER]
- NOT NULL
-
-custom_operator_name
-
- [VARCHAR(1000)]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-duration
-
- [DOUBLE_PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
-
-external_executor_id
-
- [VARCHAR(250)]
-
-hostname
-
- [VARCHAR(1000)]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
-
-next_kwargs
-
- [JSON]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
-
-queue
-
- [VARCHAR(256)]
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-scheduled_dttm
-
- [TIMESTAMP]
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
- NOT NULL
-
-unixname
-
- [VARCHAR(1000)]
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance_history
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+duration
+
+ [DOUBLE_PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+
+external_executor_id
+
+ [VARCHAR(250)]
+
+hostname
+
+ [VARCHAR(1000)]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+
+next_kwargs
+
+ [JSON]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+
+queue
+
+ [VARCHAR(256)]
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+scheduled_dttm
+
+ [TIMESTAMP]
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+ NOT NULL
+
+unixname
+
+ [VARCHAR(1000)]
+
+updated_at
+
+ [TIMESTAMP]
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
task_instance--task_instance_history
-
-0..N
-1
+
+0..N
+1
@@ -1792,115 +1792,120 @@
dag_version--task_instance
-
-0..N
+
+0..N
{0,1}
dag_run
-
-dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-backfill_id
-
- [INTEGER]
-
-bundle_version
-
- [VARCHAR(250)]
-
-clear_number
-
- [INTEGER]
- NOT NULL
-
-conf
-
- [JSONB]
-
-creating_job_id
-
- [INTEGER]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-data_interval_end
-
- [TIMESTAMP]
-
-data_interval_start
-
- [TIMESTAMP]
-
-end_date
-
- [TIMESTAMP]
-
-external_trigger
-
- [BOOLEAN]
-
-last_scheduling_decision
-
- [TIMESTAMP]
-
-log_template_id
-
- [INTEGER]
-
-logical_date
-
- [TIMESTAMP]
- NOT NULL
-
-queued_at
-
- [TIMESTAMP]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-run_type
-
- [VARCHAR(50)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(50)]
-
-triggered_by
-
- [VARCHAR(50)]
-
-updated_at
-
- [TIMESTAMP]
+
+dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+backfill_id
+
+ [INTEGER]
+
+bundle_version
+
+ [VARCHAR(250)]
+
+clear_number
+
+ [INTEGER]
+ NOT NULL
+
+conf
+
+ [JSONB]
+
+creating_job_id
+
+ [INTEGER]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+data_interval_end
+
+ [TIMESTAMP]
+
+data_interval_start
+
+ [TIMESTAMP]
+
+end_date
+
+ [TIMESTAMP]
+
+external_trigger
+
+ [BOOLEAN]
+
+last_scheduling_decision
+
+ [TIMESTAMP]
+
+log_template_id
+
+ [INTEGER]
+
+logical_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+queued_at
+
+ [TIMESTAMP]
+
+run_after
+
+ [TIMESTAMP]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+run_type
+
+ [VARCHAR(50)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(50)]
+
+triggered_by
+
+ [VARCHAR(50)]
+
+updated_at
+
+ [TIMESTAMP]
dag_version--dag_run
-
-0..N
+
+0..N
{0,1}
@@ -2000,121 +2005,121 @@
dag_run--dagrun_asset_event
-
-0..N
-1
+
+0..N
+1
dag_run--task_instance
-
-0..N
-1
+
+0..N
+1
dag_run--task_instance
-
-0..N
-1
+
+0..N
+1
dag_run--deadline
-
-0..N
-{0,1}
+
+0..N
+{0,1}
backfill_dag_run
-
-backfill_dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-backfill_id
-
- [INTEGER]
- NOT NULL
-
-dag_run_id
-
- [INTEGER]
-
-exception_reason
-
- [VARCHAR(250)]
-
-logical_date
-
- [TIMESTAMP]
- NOT NULL
-
-sort_ordinal
-
- [INTEGER]
- NOT NULL
+
+backfill_dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+backfill_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_run_id
+
+ [INTEGER]
+
+exception_reason
+
+ [VARCHAR(250)]
+
+logical_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+sort_ordinal
+
+ [INTEGER]
+ NOT NULL
dag_run--backfill_dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
dag_run_note
-
-dag_run_note
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-content
-
- [VARCHAR(1000)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
-
-user_id
-
- [VARCHAR(128)]
+
+dag_run_note
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+content
+
+ [VARCHAR(1000)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+
+ [VARCHAR(128)]
dag_run--dag_run_note
-
-1
-1
+
+1
+1
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
dag_run--task_reschedule
-
-0..N
-1
+
+0..N
+1
@@ -2145,9 +2150,9 @@
log_template--dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -2211,16 +2216,16 @@
backfill--dag_run
-
-0..N
-{0,1}
+
+0..N
+{0,1}
backfill--backfill_dag_run
-
-0..N
-1
+
+0..N
+1
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 633c2338f4064..c39bcfb3a55f7 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
-| ``33b04e4bfa19`` (head) | ``8ea135928435`` | ``3.0.0`` | add new task_instance field scheduled_dttm. |
+| ``6a9e7a527a88`` (head) | ``33b04e4bfa19`` | ``3.0.0`` | Add DagRun run_after. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``33b04e4bfa19`` | ``8ea135928435`` | ``3.0.0`` | add new task_instance field scheduled_dttm. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``8ea135928435`` | ``e39a26ac59f6`` | ``3.0.0`` | Add relative fileloc column. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py b/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py
index c56dbdb33b520..47e53cbc5f373 100644
--- a/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py
+++ b/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py
@@ -176,6 +176,8 @@ def test_render_template(self, session):
dag_id=self.dag.dag_id,
run_id="kylin_test",
logical_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state=state.DagRunState.RUNNING,
)
diff --git a/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py
index 2670340086bb7..94344d540681d 100644
--- a/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py
+++ b/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py
@@ -200,6 +200,8 @@ def test_render_template(self, session):
dag_id=self.dag.dag_id,
run_id="spark_test",
logical_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state="running",
)
diff --git a/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py b/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py
index b0eedb52c47c2..0a6db5665deb3 100644
--- a/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py
+++ b/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py
@@ -1163,6 +1163,7 @@ def test_branch_single_value_with_dag_run(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1213,6 +1214,7 @@ def test_branch_true_with_dag_run(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1263,6 +1265,7 @@ def test_branch_false_with_dag_run(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1314,6 +1317,7 @@ def test_branch_list_with_dag_run(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1362,6 +1366,7 @@ def test_invalid_query_result_with_dag_run(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1401,6 +1406,7 @@ def test_with_skip_in_branch_downstream_dependencies(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -1449,6 +1455,7 @@ def test_with_skip_in_branch_downstream_dependencies2(self, mock_get_db_hook):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
index 1a297b008cf56..02057310907f3 100644
--- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
+++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
@@ -142,18 +142,26 @@ def teardown_method(self) -> None:
def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commit=True, idx_start=1):
dag_runs = []
dags = []
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
+
+ def _v3_kwargs(date):
+ if AIRFLOW_V_3_0_PLUS:
+ return {
+ "data_interval": (date, date),
+ "logical_date": date,
+ "run_after": date,
+ "triggered_by": DagRunTriggeredByType.TEST,
+ }
+ return {"execution_date": date}
for i in range(idx_start, idx_start + 2):
dagrun_model = DagRun(
dag_id="TEST_DAG_ID",
run_id=f"TEST_DAG_RUN_ID_{i}",
run_type=DagRunType.MANUAL,
- logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state=state,
- **triggered_by_kwargs,
+ **_v3_kwargs(timezone.parse(self.default_time) + timedelta(days=i - 1)),
)
dagrun_model.updated_at = timezone.parse(self.default_time)
dag_runs.append(dagrun_model)
@@ -166,10 +174,10 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi
dag_id=f"TEST_DAG_ID_{i}",
run_id=f"TEST_DAG_RUN_ID_{i}",
run_type=DagRunType.MANUAL,
- logical_date=timezone.parse(self.default_time_2),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state=state,
+ **_v3_kwargs(timezone.parse(self.default_time_2)),
)
)
if commit:
@@ -203,8 +211,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
- "data_interval_end": None,
- "data_interval_start": None,
+ "data_interval_end": self.default_time,
+ "data_interval_start": self.default_time,
"last_scheduling_decision": None,
"run_type": "manual",
"note": None,
@@ -219,8 +227,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
- "data_interval_end": None,
- "data_interval_start": None,
+ "data_interval_end": self.default_time_2,
+ "data_interval_start": self.default_time_2,
"last_scheduling_decision": None,
"run_type": "manual",
"note": None,
diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
index 94e219ebcf0a2..9e75a2cf38368 100644
--- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
+++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
@@ -166,6 +166,8 @@ def create_task_instances(
run_id=run_id,
dag_id=dag_id,
logical_date=logical_date,
+ data_interval=(logical_date, logical_date),
+ run_after=logical_date,
run_type=DagRunType.MANUAL,
state=dag_run_state,
)
diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
index 6c3dd08b9e845..2d5177256b273 100644
--- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
+++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
@@ -180,6 +180,8 @@ def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, mapped_ti=
dag_id=dag_id,
run_id=run_id,
logical_date=logical_date,
+ data_interval=(logical_date, logical_date),
+ run_after=logical_date,
start_date=logical_date,
run_type=DagRunType.MANUAL,
)
@@ -226,6 +228,7 @@ def _create_invalid_xcom_entries(self, logical_date):
dag_id="invalid_dag",
run_id="invalid_run_id",
logical_date=logical_date + timedelta(days=1),
+ run_after=logical_date,
start_date=logical_date,
run_type=DagRunType.MANUAL,
)
@@ -243,6 +246,7 @@ def _create_invalid_xcom_entries(self, logical_date):
dag_id="invalid_dag",
run_id="not_this_run_id",
logical_date=logical_date,
+ run_after=logical_date,
start_date=logical_date,
run_type=DagRunType.MANUAL,
)
diff --git a/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py b/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py
index 290afe7c737cf..3ffc5918baa7d 100644
--- a/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py
+++ b/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py
@@ -28,22 +28,18 @@
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.www.views import FILTER_STATUS_COOKIE
from provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import create_user_scope
from tests_common.test_utils.db import clear_db_runs
from tests_common.test_utils.permissions import _resource_name
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import (
check_content_in_response,
check_content_not_in_response,
client_with_login,
)
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
pytestmark = pytest.mark.db_test
NEXT_YEAR = datetime.datetime.now().year + 1
@@ -148,7 +144,6 @@ def _reset_dagruns():
@pytest.fixture(autouse=True)
def _init_dagruns(acl_app, _reset_dagruns):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
acl_app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id=DEFAULT_RUN_ID,
run_type=DagRunType.SCHEDULED,
@@ -156,7 +151,8 @@ def _init_dagruns(acl_app, _reset_dagruns):
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
acl_app.dag_bag.get_dag("example_python_operator").create_dagrun(
run_id=DEFAULT_RUN_ID,
@@ -165,7 +161,8 @@ def _init_dagruns(acl_app, _reset_dagruns):
start_date=timezone.utcnow(),
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
state=State.RUNNING,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
yield
clear_db_runs()
diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py
index 6eb84a5d4012d..79ee2ae42179e 100644
--- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py
@@ -101,6 +101,7 @@ def setup_job(self, task_name, run_id):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
@@ -207,6 +208,7 @@ def test_success_overtime_kills_tasks(self):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py
index 25d9c24e6e519..f8da7452831a0 100644
--- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py
@@ -93,6 +93,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
dagrun_kwargs = {
"dag_version": None,
"logical_date": date,
+ "run_after": date,
"triggered_by": types.DagRunTriggeredByType.TEST,
}
else:
@@ -179,6 +180,7 @@ def sample_callable(**kwargs):
dagrun_kwargs: dict = {
"dag_version": None,
"logical_date": date,
+ "run_after": date,
"triggered_by": types.DagRunTriggeredByType.TEST,
}
else:
@@ -710,6 +712,7 @@ def simple_callable(**kwargs):
dagrun_kwargs = {
"dag_version": None,
"logical_date": date,
+ "run_after": date,
"triggered_by": types.DagRunTriggeredByType.TEST,
}
else:
diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py
index 36b8057cf01f6..59fbc8605ad6e 100644
--- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py
+++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py
@@ -105,6 +105,7 @@ def test_get_dagrun_start_end(dag_maker):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs = {
"logical_date": data_interval.start,
+ "run_after": data_interval.end,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
diff --git a/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py b/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py
index 2dde6fa402b14..32d80bf07a980 100644
--- a/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py
+++ b/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py
@@ -42,7 +42,14 @@ def ti(self):
dag = DAG(dag_id="dag_for_testing_redis_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_redis_log_handler", dag=dag)
if AIRFLOW_V_3_0_PLUS:
- dag_run = DagRun(dag_id=dag.dag_id, logical_date=date, run_id="test", run_type="scheduled")
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ logical_date=date,
+ data_interval=(date, date),
+ run_after=date,
+ run_id="test",
+ run_type="scheduled",
+ )
else:
dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled")
diff --git a/providers/standard/tests/provider_tests/standard/decorators/test_python.py b/providers/standard/tests/provider_tests/standard/decorators/test_python.py
index 64914e3d149f8..84a58faae72e2 100644
--- a/providers/standard/tests/provider_tests/standard/decorators/test_python.py
+++ b/providers/standard/tests/provider_tests/standard/decorators/test_python.py
@@ -444,8 +444,9 @@ def return_dict(number: int):
with self.dag_non_serialized:
ret = return_dict(test_number)
- triggered_by_kwargs = (
+ v3_kwargs = (
{
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
"logical_date": DEFAULT_DATE,
}
@@ -462,7 +463,7 @@ def return_dict(number: int):
data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval(
run_after=DEFAULT_DATE
),
- **triggered_by_kwargs,
+ **v3_kwargs,
)
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
@@ -517,8 +518,9 @@ def add_num(number: int, num2: int = 2):
bigger_number = add_2(test_number)
ret = add_num(bigger_number, XComArg(bigger_number.operator))
- triggered_by_kwargs = (
+ v3_kwargs = (
{
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
"logical_date": DEFAULT_DATE,
}
@@ -535,7 +537,7 @@ def add_num(number: int, num2: int = 2):
data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval(
run_after=DEFAULT_DATE
),
- **triggered_by_kwargs,
+ **v3_kwargs,
)
bigger_number.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
diff --git a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
index d89fbdf6edb15..cc63b8b15b310 100644
--- a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
+++ b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
@@ -129,6 +129,7 @@ def task_callable(ti):
if AIRFLOW_V_3_0_PLUS:
dagrun_kwargs: dict = {
"logical_date": DEFAULT_DATE,
+ "run_after": DEFAULT_DATE,
"triggered_by": DagRunTriggeredByType.TEST,
}
else:
diff --git a/providers/tests/cncf/kubernetes/operators/test_pod.py b/providers/tests/cncf/kubernetes/operators/test_pod.py
index 7e3806e8110f4..0aacc4b782fd0 100644
--- a/providers/tests/cncf/kubernetes/operators/test_pod.py
+++ b/providers/tests/cncf/kubernetes/operators/test_pod.py
@@ -55,6 +55,7 @@
from airflow.utils.types import DagRunType
from tests_common.test_utils import db
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -96,11 +97,23 @@ def create_context(task, persist_to_db=False, map_index=None):
else:
dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now())
dag.add_task(task)
- dag_run = DagRun(
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
- run_type=DagRunType.MANUAL,
- dag_id=dag.dag_id,
- )
+ now = timezone.utcnow()
+ if AIRFLOW_V_3_0_PLUS:
+ dag_run = DagRun(
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+ run_type=DagRunType.MANUAL,
+ dag_id=dag.dag_id,
+ logical_date=now,
+ data_interval=(now, now),
+ run_after=now,
+ )
+ else:
+ dag_run = DagRun(
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+ run_type=DagRunType.MANUAL,
+ dag_id=dag.dag_id,
+ execution_date=now,
+ )
task_instance = TaskInstance(task=task, run_id=dag_run.run_id)
task_instance.dag_run = dag_run
if map_index is not None:
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index e4aa7895ac662..d9b6e1d45ec0c 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -30,17 +30,13 @@
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.state import DagRunState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.api_connexion_utils import create_user, delete_user
from tests_common.test_utils.compat import BaseOperatorLink
from tests_common.test_utils.db import clear_db_runs, clear_db_xcom
from tests_common.test_utils.mock_operators import CustomOperator
from tests_common.test_utils.mock_plugins import mock_plugin_manager
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -79,15 +75,16 @@ def setup_attrs(self, configured_app, session) -> None:
self.app.dag_bag.dags = {self.dag.dag_id: self.dag}
self.app.dag_bag.sync_to_db("dags-folder", None)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
+ data_interval = DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2))
self.dag.create_dagrun(
run_id="TEST_DAG_RUN_ID",
logical_date=self.default_time,
run_type=DagRunType.MANUAL,
state=DagRunState.SUCCESS,
session=session,
- data_interval=DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)),
- **triggered_by_kwargs,
+ data_interval=data_interval,
+ run_after=data_interval.end,
+ triggered_by=DagRunTriggeredByType.TEST,
)
session.flush()
diff --git a/tests/api_fastapi/common/test_exceptions.py b/tests/api_fastapi/common/test_exceptions.py
index 6751aff20c725..d75e3bbe23bdf 100644
--- a/tests/api_fastapi/common/test_exceptions.py
+++ b/tests/api_fastapi/common/test_exceptions.py
@@ -186,7 +186,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)",
+ "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id",
},
),
@@ -194,7 +194,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)",
+ "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")",
},
),
@@ -202,7 +202,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(external_trigger)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id",
+ "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(external_trigger)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(run_after)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id",
"orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n',
},
),
diff --git a/tests/api_fastapi/core_api/routes/public/test_extra_links.py b/tests/api_fastapi/core_api/routes/public/test_extra_links.py
index 89aef555bbe20..b3afd174db455 100644
--- a/tests/api_fastapi/core_api/routes/public/test_extra_links.py
+++ b/tests/api_fastapi/core_api/routes/public/test_extra_links.py
@@ -74,15 +74,14 @@ def setup(self, test_client, session=None) -> None:
test_client.app.state.dag_bag = dag_bag
dag_bag.sync_to_db("dags-folder", None)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
-
self.dag.create_dagrun(
run_id=self.dag_run_id,
logical_date=self.default_time,
run_type=DagRunType.MANUAL,
state=DagRunState.SUCCESS,
data_interval=(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)),
- **triggered_by_kwargs,
+ run_after=timezone.datetime(2020, 1, 2),
+ triggered_by=DagRunTriggeredByType.TEST,
)
def teardown_method(self) -> None:
diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py b/tests/api_fastapi/execution_api/routes/test_task_instances.py
index 8a7b21699f25e..c1a131e273605 100644
--- a/tests/api_fastapi/execution_api/routes/test_task_instances.py
+++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py
@@ -99,6 +99,7 @@ def test_ti_run_state_to_running(self, client, session, create_task_instance, ti
"logical_date": instant_str,
"data_interval_start": instant.subtract(days=1).to_iso8601_string(),
"data_interval_end": instant_str,
+ "run_after": instant_str,
"start_date": instant_str,
"end_date": None,
"external_trigger": False,
diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py
index 2d6f699c1e788..166f152b9eaee 100644
--- a/tests/cli/commands/remote_commands/test_task_command.py
+++ b/tests/cli/commands/remote_commands/test_task_command.py
@@ -50,14 +50,10 @@
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_pools, clear_db_runs, parse_and_sync_to_db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -104,21 +100,15 @@ def setup_class(cls):
cls.dagbag = DagBag(read_dags_from_db=True)
cls.dag = cls.dagbag.get_dag(cls.dag_id)
data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
- v3_kwargs = (
- {
- "dag_version": None,
- "triggered_by": DagRunTriggeredByType.TEST,
- }
- if AIRFLOW_V_3_0_PLUS
- else {}
- )
cls.dag_run = cls.dag.create_dagrun(
state=State.RUNNING,
run_id=cls.run_id,
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATE,
data_interval=data_interval,
- **v3_kwargs,
+ run_after=DEFAULT_DATE,
+ dag_version=None,
+ triggered_by=DagRunTriggeredByType.TEST,
)
@classmethod
@@ -175,22 +165,16 @@ def test_cli_test_different_path(self, session, tmp_path):
logical_date = pendulum.now("UTC")
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
- v3_kwargs = (
- {
- "dag_version": None,
- "triggered_by": DagRunTriggeredByType.TEST,
- }
- if AIRFLOW_V_3_0_PLUS
- else {}
- )
dag.create_dagrun(
state=State.RUNNING,
run_id="abc123",
run_type=DagRunType.MANUAL,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=logical_date,
+ dag_version=None,
+ triggered_by=DagRunTriggeredByType.TEST,
session=session,
- **v3_kwargs,
)
session.commit()
@@ -647,22 +631,16 @@ def test_task_states_for_dag_run(self):
default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2)
- v3_kwargs = (
- {
- "dag_version": None,
- "triggered_by": DagRunTriggeredByType.CLI,
- }
- if AIRFLOW_V_3_0_PLUS
- else {}
- )
dagrun = dag2.create_dagrun(
run_id="test",
state=State.RUNNING,
logical_date=default_date2,
data_interval=data_interval,
+ run_after=default_date2,
run_type=DagRunType.MANUAL,
external_trigger=True,
- **v3_kwargs,
+ dag_version=None,
+ triggered_by=DagRunTriggeredByType.CLI,
)
ti2 = TaskInstance(task2, run_id=dagrun.run_id)
ti2.set_state(State.SUCCESS)
@@ -736,22 +714,16 @@ def setup_method(self) -> None:
dag = DagBag().get_dag(self.dag_id)
data_interval = dag.timetable.infer_manual_data_interval(run_after=self.logical_date)
- v3_kwargs = (
- {
- "dag_version": None,
- "triggered_by": DagRunTriggeredByType.TEST,
- }
- if AIRFLOW_V_3_0_PLUS
- else {}
- )
self.dr = dag.create_dagrun(
run_id=self.run_id,
logical_date=self.logical_date,
data_interval=data_interval,
+ run_after=self.logical_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
- **v3_kwargs,
+ dag_version=None,
+ triggered_by=DagRunTriggeredByType.TEST,
)
self.tis = self.dr.get_task_instances()
assert len(self.tis) == 1
@@ -1049,22 +1021,16 @@ def test_context_with_run():
dag = DagBag().get_dag(dag_id)
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
- v3_kwargs = (
- {
- "dag_version": None,
- "triggered_by": DagRunTriggeredByType.TEST,
- }
- if AIRFLOW_V_3_0_PLUS
- else {}
- )
dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=logical_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
- **v3_kwargs,
+ dag_version=None,
+ triggered_by=DagRunTriggeredByType.TEST,
)
with conf_vars({("core", "dags_folder"): dag_path}):
task_command.task_run(parser.parse_args(task_args))
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 2ba95f15049ed..9844186be1df5 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -104,6 +104,7 @@ def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
triggered_by=DagRunTriggeredByType.TEST,
session=session,
)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index d229abfe34f89..c86599d555c54 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -49,16 +49,12 @@
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils import db
from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.mock_executor import MockExecutor
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -307,7 +303,6 @@ def test_heartbeat_failed_fast(self):
dag = self.dagbag.get_dag(dag_id)
task = dag.get_task(task_id)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test_heartbeat_failed_fast_run",
run_type=DagRunType.MANUAL,
@@ -316,7 +311,8 @@ def test_heartbeat_failed_fast(self):
start_date=DEFAULT_DATE,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = dr.task_instances[0]
@@ -346,7 +342,6 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session):
"""
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test",
state=State.RUNNING,
@@ -354,7 +349,8 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="test_mark_success_no_kill")
@@ -379,8 +375,6 @@ def test_localtaskjob_double_trigger(self):
session = settings.Session()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
-
dag.clear()
dr = dag.create_dagrun(
run_id="test",
@@ -390,7 +384,8 @@ def test_localtaskjob_double_trigger(self):
start_date=DEFAULT_DATE,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = dr.get_task_instance(task_id=task.task_id, session=session)
@@ -485,7 +480,6 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
"""
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -494,7 +488,8 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="test_mark_failure_externally")
ti = dr.get_task_instance(task.task_id)
@@ -521,7 +516,6 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
dag = get_test_dag("test_mark_state")
dag.dagrun_timeout = datetime.timedelta(microseconds=1)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -531,7 +525,8 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="test_mark_skipped_externally")
ti = dr.get_task_instance(task.task_id)
@@ -557,7 +552,6 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t
monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
dag = get_test_dag("test_on_failure_callback")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -566,7 +560,8 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="test_on_failure_callback_task")
ti = TaskInstance(task=task, run_id=dr.run_id)
@@ -594,7 +589,6 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag):
"""
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -603,7 +597,8 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="test_mark_success_no_kill")
@@ -631,7 +626,6 @@ def test_success_listeners_executed(self, caplog, get_test_dag):
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -640,7 +634,8 @@ def test_success_listeners_executed(self, caplog, get_test_dag):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="sleep_execution")
@@ -671,7 +666,6 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -680,7 +674,8 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="sleep_execution")
@@ -711,7 +706,6 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl
dag = get_test_dag("test_mark_state")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dr = dag.create_dagrun(
run_id="test",
@@ -720,7 +714,8 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="slow_execution")
@@ -755,7 +750,6 @@ def test_process_os_signal_calls_on_failure_callback(
monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
dag = get_test_dag("test_on_failure_callback")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dag.create_dagrun(
run_id="test",
@@ -764,7 +758,8 @@ def test_process_os_signal_calls_on_failure_callback(
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
task = dag.get_task(task_id="bash_sleep")
dag_run = dag.get_last_dagrun()
@@ -1016,14 +1011,14 @@ def task_function():
logger.addHandler(tmpfile_handler)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_LOGICAL_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TaskInstance(task=task, run_id=dag_run.run_id)
ti.refresh_from_db()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 62441d646cd2a..faf8fa73284af 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -69,7 +69,7 @@
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests.listeners import dag_listener
from tests.listeners.test_listeners import get_listener_manager
@@ -90,10 +90,6 @@
)
from tests_common.test_utils.mock_executor import MockExecutor
from tests_common.test_utils.mock_operators import CustomOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -133,8 +129,6 @@ def _loader_mock(mock_executors):
@pytest.fixture
def create_dagrun(session):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
-
def _create_dagrun(
dag: DAG,
*,
@@ -153,10 +147,11 @@ def _create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
run_type=run_type,
state=state,
start_date=start_date,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
return _create_dagrun
@@ -2870,13 +2865,15 @@ def test_scheduler_start_date(self, testing_dag_bundle):
# because it would take the most recent run and start from there
# That behavior still exists, but now it will only do so if after the
# start date
+ data_interval_end = DEFAULT_DATE + timedelta(days=1)
dag.create_dagrun(
state="success",
triggered_by=DagRunTriggeredByType.TIMETABLE,
run_id="abc123",
logical_date=DEFAULT_DATE,
run_type=DagRunType.BACKFILL_JOB,
- data_interval=DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
+ data_interval=DataInterval(DEFAULT_DATE, data_interval_end),
+ run_after=data_interval_end,
)
# one task "ran"
assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1
@@ -3061,7 +3058,6 @@ def test_scheduler_keeps_scheduling_pool_full(self, dag_maker, mock_executor):
def _create_dagruns(dag: DAG):
next_info = dag.next_dagrun_info(None)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
assert next_info is not None
for i in range(30):
yield dag.create_dagrun(
@@ -3069,8 +3065,9 @@ def _create_dagruns(dag: DAG):
run_type=DagRunType.SCHEDULED,
logical_date=next_info.logical_date,
data_interval=next_info.data_interval,
+ run_after=next_info.run_after,
state=DagRunState.RUNNING,
- **triggered_by_kwargs, # type: ignore
+ triggered_by=DagRunTriggeredByType.TEST,
)
next_info = dag.next_dagrun_info(next_info.data_interval)
if next_info is None:
@@ -4072,7 +4069,6 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak
# Trigger the Dag externally
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test",
state=DagRunState.RUNNING,
@@ -4081,7 +4077,8 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak
session=session,
external_trigger=True,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=DEFAULT_LOGICAL_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert dr is not None
# Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
@@ -4158,7 +4155,6 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, session):
)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_version = DagVersion.get_latest_version(dag.dag_id)
run1 = dag.create_dagrun(
run_id="test1",
@@ -4168,22 +4164,25 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, session):
start_date=timezone.utcnow() - timedelta(seconds=2),
session=session,
data_interval=data_interval,
+ run_after=DEFAULT_LOGICAL_DATE,
dag_version=dag_version,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
run1_ti = run1.get_task_instance(task1.task_id, session)
run1_ti.state = State.RUNNING
+ logical_date_2 = DEFAULT_DATE + timedelta(seconds=10)
run2 = dag.create_dagrun(
run_id="test2",
run_type=DagRunType.SCHEDULED,
- logical_date=DEFAULT_DATE + timedelta(seconds=10),
+ logical_date=logical_date_2,
state=State.QUEUED,
session=session,
data_interval=data_interval,
+ run_after=logical_date_2,
dag_version=dag_version,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
scheduler_job = Job(executor=self.null_exec)
@@ -5720,15 +5719,16 @@ def test_mapped_dag(self, dag_id, session, testing_dag_bundle):
logical_date = timezone.coerce_datetime(timezone.utcnow() - datetime.timedelta(days=2))
data_interval = dag.infer_automated_data_interval(logical_date)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id=f"{dag_id}_1",
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
state=State.RUNNING,
session=session,
+ logical_date=logical_date,
data_interval=data_interval,
- **triggered_by_kwargs,
+ run_after=data_interval,
+ triggered_by=DagRunTriggeredByType.TEST,
)
executor = SequentialExecutor()
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index e839b1908bcf8..c66ddce9ccd20 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -93,10 +93,13 @@ def session():
def create_trigger_in_db(session, trigger, operator=None):
dag_model = DagModel(dag_id="test_dag")
dag = DAG(dag_id=dag_model.dag_id, schedule="@daily", start_date=pendulum.datetime(2023, 1, 1))
+ date = pendulum.datetime(2023, 1, 1)
run = DagRun(
dag_id=dag_model.dag_id,
run_id="test_run",
- logical_date=pendulum.datetime(2023, 1, 1),
+ logical_date=date,
+ data_interval=(date, date),
+ run_after=date,
run_type=DagRunType.MANUAL,
)
trigger_orm = Trigger.from_object(trigger)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 092db38ac0b20..a469e84406ec0 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -33,14 +33,10 @@
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests.models import DEFAULT_DATE
from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -642,7 +638,6 @@ def test_dags_clear(self):
)
task = EmptyOperator(task_id=f"test_task_clear_{i}", owner="test", dag=dag)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id=f"scheduled_{i}",
logical_date=DEFAULT_DATE,
@@ -650,7 +645,8 @@ def test_dags_clear(self):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = dr.task_instances[0]
ti.task = task
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 78fffd45fd098..fde7e9eafa62a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -87,7 +87,7 @@
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.timezone import datetime as datetime_tz
from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.utils.weight_rule import WeightRule
from tests.models import DEFAULT_DATE
@@ -106,13 +106,8 @@
from tests_common.test_utils.mapping import expand_mapped_task
from tests_common.test_utils.mock_plugins import mock_plugin_manager
from tests_common.test_utils.timetables import cron_timetable, delta_timetable
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
if TYPE_CHECKING:
- from pendulum import DateTime
from sqlalchemy.orm import Session
pytestmark = pytest.mark.db_test
@@ -150,13 +145,15 @@ def test_dags_bundle(configure_testing_dag_bundle):
def _create_dagrun(
dag: DAG,
*,
- logical_date: DateTime,
- data_interval: DataInterval,
+ logical_date: datetime.datetime,
+ data_interval: tuple[datetime.datetime, datetime.datetime],
run_type: DagRunType,
state: DagRunState = DagRunState.RUNNING,
start_date: datetime.datetime | None = None,
) -> DagRun:
- triggered_by_kwargs: dict = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
+ logical_date = timezone.coerce_datetime(logical_date)
+ if not isinstance(data_interval, DataInterval):
+ data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval))
run_id = dag.timetable.generate_run_id(
run_type=run_type,
logical_date=logical_date, # type: ignore
@@ -166,10 +163,11 @@ def _create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
run_type=run_type,
state=state,
start_date=start_date,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
@@ -429,7 +427,6 @@ def test_get_task_instances_before(self):
EmptyOperator(task_id=test_task_id, dag=test_dag)
session = settings.Session()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED):
dagrun = test_dag.create_dagrun(
@@ -438,11 +435,11 @@ def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED):
run_id=f"test_{delta_h}",
logical_date=None,
data_interval=None,
+ run_after=None,
session=session,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
- dagrun.start_date = BASE_DATE + timedelta(hours=delta_h)
- dagrun.logical_date = BASE_DATE + timedelta(hours=delta_h)
+ dagrun.start_date = dagrun.run_after = dagrun.logical_date = BASE_DATE + timedelta(hours=delta_h)
return dagrun
dr1 = dag_run_before(delta_h=-1, type=DagRunType.MANUAL) # H19
@@ -635,14 +632,14 @@ def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
# Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none
dag = DAG("dag_with_none_schedule_and_empty_start_date", schedule=None)
dag.add_task(BaseOperator(task_id="task_without_start_date"))
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dagrun = dag.create_dagrun(
run_id="test",
state=State.RUNNING,
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert dagrun is not None
@@ -832,7 +829,6 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state):
assert model.next_dagrun == DEFAULT_DATE
assert model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=1)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test",
state=state,
@@ -840,7 +836,8 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state):
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=(model.next_dagrun, model.next_dagrun),
- **triggered_by_kwargs,
+ run_after=model.next_dagrun_create_after,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert dr is not None
DAG.bulk_write_to_db("testing", None, [dag])
@@ -1046,7 +1043,6 @@ def test_existing_dag_is_paused_config(self):
def test_existing_dag_is_paused_after_limit(self, testing_dag_bundle):
def add_failed_dag_run(dag, id, logical_date):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_v = DagVersion.get_latest_version(dag_id=dag.dag_id)
dr = dag.create_dagrun(
run_type=DagRunType.MANUAL,
@@ -1054,8 +1050,9 @@ def add_failed_dag_run(dag, id, logical_date):
logical_date=logical_date,
state=State.FAILED,
data_interval=(logical_date, logical_date),
+ run_after=logical_date,
dag_version=dag_v,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti_op1 = dr.get_task_instance(task_id=op1.task_id, session=session)
ti_op1.set_state(state=TaskInstanceState.FAILED, session=session)
@@ -1117,7 +1114,6 @@ def test_schedule_dag_no_previous_runs(self):
dag_id = "test_schedule_dag_no_previous_runs"
dag = DAG(dag_id=dag_id, schedule=None)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_run = dag.create_dagrun(
run_id="test",
@@ -1125,7 +1121,8 @@ def test_schedule_dag_no_previous_runs(self):
logical_date=TEST_DATE,
state=State.RUNNING,
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert dag_run is not None
assert dag.dag_id == dag_run.dag_id
@@ -1156,7 +1153,6 @@ def test_dag_handle_callback_crash(self, mock_stats):
)
when = TEST_DATE
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=when))
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with create_session() as session:
dag_run = dag.create_dagrun(
@@ -1166,7 +1162,8 @@ def test_dag_handle_callback_crash(self, mock_stats):
run_type=DagRunType.MANUAL,
session=session,
data_interval=(when, when),
- **triggered_by_kwargs,
+ run_after=when,
+ triggered_by=DagRunTriggeredByType.TEST,
)
# should not raise any exception
@@ -1197,7 +1194,6 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session):
task_removed = EmptyOperator(task_id="removed_task")
with create_session() as session:
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_run = dag.create_dagrun(
run_id="test",
state=State.RUNNING,
@@ -1205,7 +1201,8 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session):
run_type=DagRunType.MANUAL,
session=session,
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dag._remove_task(task_removed.task_id)
tis = dag_run.get_task_instances(session=session)
@@ -1421,15 +1418,15 @@ def test_description_from_timetable(self, timetable, expected_description):
def test_create_dagrun_job_id_is_set(self):
job_id = 42
dag = DAG(dag_id="test_create_dagrun_job_id_is_set", schedule=None)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test_create_dagrun_job_id_is_set",
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state=State.NONE,
creating_job_id=job_id,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert dr.creating_job_id == job_id
@@ -1547,6 +1544,7 @@ def consumer(value):
logical_date=DEFAULT_DATE,
session=session,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
triggered_by=DagRunTriggeredByType.TEST,
)
# Get the (de)serialized MappedOperator
@@ -1728,7 +1726,6 @@ def test_clear_dag(
_ = EmptyOperator(task_id=task_id, dag=dag)
session = settings.Session() # type: ignore
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dagrun_1 = dag.create_dagrun(
run_id="backfill",
run_type=DagRunType.BACKFILL_JOB,
@@ -1736,7 +1733,8 @@ def test_clear_dag(
start_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- **triggered_by_kwargs, # type: ignore
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
session.merge(dagrun_1)
@@ -2043,7 +2041,6 @@ def test_validate_executor_field(self):
def test_validate_params_on_trigger_dag(self):
dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")})
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"):
dag.create_dagrun(
run_id="test_dagrun_missing_param",
@@ -2051,7 +2048,8 @@ def test_validate_params_on_trigger_dag(self):
state=State.RUNNING,
logical_date=TEST_DATE,
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")})
@@ -2065,7 +2063,8 @@ def test_validate_params_on_trigger_dag(self):
logical_date=TEST_DATE,
conf={"param1": None},
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")})
@@ -2076,7 +2075,8 @@ def test_validate_params_on_trigger_dag(self):
logical_date=TEST_DATE,
conf={"param1": "hello"},
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
def test_dag_owner_links(self):
@@ -2505,7 +2505,6 @@ def teardown_method(self) -> None:
@pytest.mark.parametrize("tasks_count", [3, 12])
def test_count_number_queries(self, tasks_count):
dag = DAG("test_dagrun_query_count", schedule=None, start_date=DEFAULT_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
for i in range(tasks_count):
EmptyOperator(task_id=f"dummy_task_{i}", owner="test", dag=dag)
with assert_queries_count(4):
@@ -2515,7 +2514,8 @@ def test_count_number_queries(self, tasks_count):
state=State.RUNNING,
logical_date=TEST_DATE,
data_interval=(TEST_DATE, TEST_DATE),
- **triggered_by_kwargs,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
@@ -2584,15 +2584,15 @@ def return_num(num):
dag = xcom_pass_to_op()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test",
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
logical_date=self.DEFAULT_DATE,
data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE),
+ run_after=self.DEFAULT_DATE,
state=State.RUNNING,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
self.operator.run(start_date=self.DEFAULT_DATE, end_date=self.DEFAULT_DATE)
@@ -2615,16 +2615,16 @@ def return_num(num):
dag = xcom_pass_to_op()
new_value = 52
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test",
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
logical_date=self.DEFAULT_DATE,
data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE),
+ run_after=self.DEFAULT_DATE,
state=State.RUNNING,
conf={"value": new_value},
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
self.operator.run(start_date=self.DEFAULT_DATE, end_date=self.DEFAULT_DATE)
@@ -3122,7 +3122,6 @@ def test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker
def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None:
dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
run_id = run_id_type.generate_run_id(DEFAULT_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with pytest.raises(ValueError) as ctx:
dag.create_dagrun(
@@ -3130,8 +3129,9 @@ def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagR
run_id=run_id,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
state=DagRunState.QUEUED,
- **triggered_by_kwargs, # type: ignore
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert str(ctx.value) == (
f"A manual DAG run cannot use ID {run_id!r} since it is reserved for {run_id_type.value} runs"
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index b5afa6301f4a6..12bccca74b815 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -46,16 +46,12 @@
from airflow.utils import timezone
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests.models import DEFAULT_DATE as _DEFAULT_DATE
from tests_common.test_utils import db
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.mock_operators import MockOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -103,7 +99,6 @@ def create_dag_run(
):
now = timezone.utcnow()
logical_date = pendulum.instance(logical_date or now)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
if is_backfill:
run_type = DagRunType.BACKFILL_JOB
data_interval = dag.infer_automated_data_interval(logical_date)
@@ -119,10 +114,11 @@ def create_dag_run(
run_type=run_type,
logical_date=logical_date,
data_interval=data_interval,
+ run_after=data_interval.end,
start_date=now,
state=state,
external_trigger=False,
- **triggered_by_kwargs, # type: ignore
+ triggered_by=DagRunTriggeredByType.TEST,
)
if task_states is not None:
@@ -839,7 +835,6 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state):
)
session.add(orm_dag)
session.flush()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
@@ -850,9 +845,10 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state):
state=state,
logical_date=DEFAULT_DATE,
data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
start_date=DEFAULT_DATE if state == DagRunState.RUNNING else None,
session=session,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
if state == DagRunState.RUNNING:
@@ -917,7 +913,6 @@ def test_emit_scheduling_delay(self, session, schedule, expected):
orm_dag = DagModel(**orm_dag_kwargs)
session.add(orm_dag)
session.flush()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
@@ -928,9 +923,10 @@ def test_emit_scheduling_delay(self, session, schedule, expected):
state=DagRunState.SUCCESS,
logical_date=dag.start_date,
data_interval=dag.infer_automated_data_interval(dag.start_date),
+ run_after=dag.start_date,
start_date=dag.start_date,
+ triggered_by=DagRunTriggeredByType.TEST,
session=session,
- **triggered_by_kwargs,
)
ti = dag_run.get_task_instance(dag_task.task_id, session)
ti.set_state(TaskInstanceState.SUCCESS, session)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 628945ebf2aaa..ab43b61cb95e9 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -96,7 +96,7 @@
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.task_group import TaskGroup
from airflow.utils.task_instance_session import set_current_task_instance_session
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER
@@ -104,10 +104,6 @@
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_connections, clear_db_runs
from tests_common.test_utils.mock_operators import MockOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = [pytest.mark.db_test]
@@ -1745,14 +1741,14 @@ def test_xcom_pull_different_logical_date(self, create_task_instance):
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
ti.run()
exec_date += datetime.timedelta(days=1)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = ti.task.dag.create_dagrun(
run_id="test2",
run_type=DagRunType.MANUAL,
logical_date=exec_date,
data_interval=(exec_date, exec_date),
+ run_after=exec_date,
state=None,
- **triggered_by_kwargs,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TI(task=ti.task, run_id=dr.run_id)
ti.run()
@@ -2027,7 +2023,6 @@ def test_get_num_running_task_instances(self, create_task_instance):
)
logical_date = DEFAULT_DATE + datetime.timedelta(days=1)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = ti1.task.dag.create_dagrun(
logical_date=logical_date,
run_type=DagRunType.MANUAL,
@@ -2035,7 +2030,8 @@ def test_get_num_running_task_instances(self, create_task_instance):
run_id="2",
session=session,
data_interval=(logical_date, logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert ti1 in session
ti2 = dr.task_instances[0]
@@ -3210,7 +3206,6 @@ def test_get_previous_start_date_none(self, dag_maker):
day_1 = DEFAULT_DATE
day_2 = DEFAULT_DATE + datetime.timedelta(days=1)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
# Create a DagRun for day_1 and day_2. Calling ti_2.get_previous_start_date()
# should return the start_date of ti_1 (which is None because ti_1 was not run).
@@ -3219,7 +3214,6 @@ def test_get_previous_start_date_none(self, dag_maker):
logical_date=day_1,
state=State.RUNNING,
run_type=DagRunType.MANUAL,
- **triggered_by_kwargs,
)
dagrun_2 = dag_maker.create_dagrun(
@@ -3227,7 +3221,6 @@ def test_get_previous_start_date_none(self, dag_maker):
state=State.RUNNING,
run_type=DagRunType.MANUAL,
data_interval=(day_1, day_2),
- **triggered_by_kwargs,
)
ti_1 = dagrun_1.get_task_instance(task.task_id)
@@ -3254,7 +3247,6 @@ def test_context_triggering_asset_events(self, create_dummy_dag, session):
session.commit()
logical_date = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
# it's easier to fake a manual run here
dag, task1 = create_dummy_dag(
dag_id="test_triggering_asset_events",
@@ -3271,7 +3263,8 @@ def test_context_triggering_asset_events(self, create_dummy_dag, session):
state=DagRunState.RUNNING,
session=session,
data_interval=(logical_date, logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ds1_event = AssetEvent(asset_id=1)
ds2_event_1 = AssetEvent(asset_id=2)
@@ -3523,7 +3516,6 @@ def test_handle_failure(self, create_dummy_dag, session=None):
session=session,
)
logical_date = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test2",
run_type=DagRunType.MANUAL,
@@ -3532,7 +3524,8 @@ def test_handle_failure(self, create_dummy_dag, session=None):
start_date=logical_date - datetime.timedelta(hours=1),
session=session,
data_interval=(logical_date, logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dr.set_state(DagRunState.FAILED)
ti1 = dr.get_task_instance(task1.task_id, session=session)
@@ -3672,7 +3665,6 @@ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
fail_fast=True,
)
logical_date = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
run_id="test_ff",
run_type=DagRunType.MANUAL,
@@ -3680,7 +3672,8 @@ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
state=DagRunState.RUNNING,
session=session,
data_interval=(logical_date, logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dr.set_state(DagRunState.SUCCESS)
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 1e62b414de2da..36fecea2314ce 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -64,6 +64,8 @@ def func(*, dag_id, task_id, logical_date):
run_type=DagRunType.SCHEDULED,
run_id=run_id,
logical_date=logical_date,
+ data_interval=(logical_date, logical_date),
+ run_after=logical_date,
)
session.add(run)
ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id)
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 74d8371e5d78e..d4ff1796a3442 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -192,6 +192,8 @@ def test_trigger_dagrun_twice(self, dag_maker):
dag_run = DagRun(
dag_id=TRIGGERED_DAG_ID,
logical_date=utc_now,
+ data_interval=(utc_now, utc_now),
+ run_after=utc_now,
state=State.SUCCESS,
run_type="manual",
run_id=run_id,
@@ -230,6 +232,8 @@ def test_trigger_dagrun_with_scheduled_dag_run(self, dag_maker):
dag_run = DagRun(
dag_id=TRIGGERED_DAG_ID,
logical_date=utc_now,
+ data_interval=(utc_now, utc_now),
+ run_after=utc_now,
state=State.SUCCESS,
run_type="scheduled",
run_id=run_id,
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 8a9b810928c70..fc1b8a1315da2 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -1244,6 +1244,7 @@ def run_tasks(
),
logical_date=logical_date,
data_interval=data_interval,
+ run_after=logical_date,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.TEST,
dag_version=None,
diff --git a/tests/task/test_standard_task_runner.py b/tests/task/test_standard_task_runner.py
index 51c5f9f8fa3e8..e4560ba3d1d70 100644
--- a/tests/task/test_standard_task_runner.py
+++ b/tests/task/test_standard_task_runner.py
@@ -40,15 +40,11 @@
from airflow.utils.platform import getuser
from airflow.utils.state import State
from airflow.utils.timeout import timeout
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests.listeners import xcom_listener
from tests.listeners.file_write_listener import FileWriteListener
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
@@ -187,7 +183,6 @@ def test_notifies_about_start_and_stop(self, tmp_path, session):
dag = dagbag.dags.get("test_example_bash_operator")
task = dag.get_task("runme_1")
current_time = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
logical_date=current_time,
@@ -196,7 +191,8 @@ def test_notifies_about_start_and_stop(self, tmp_path, session):
state=State.RUNNING,
start_date=current_time,
session=session,
- **triggered_by_kwargs,
+ run_after=current_time,
+ triggered_by=DagRunTriggeredByType.TEST,
)
session.commit()
ti = TaskInstance(task=task, run_id="test")
@@ -235,7 +231,6 @@ def test_notifies_about_fail(self, tmp_path):
dag = dagbag.dags.get("test_failing_bash_operator")
task = dag.get_task("failing_task")
current_time = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
logical_date=current_time,
@@ -243,7 +238,8 @@ def test_notifies_about_fail(self, tmp_path):
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=current_time,
- **triggered_by_kwargs,
+ run_after=current_time,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TaskInstance(task=task, run_id="test")
job = Job(dag_id=ti.dag_id)
@@ -285,7 +281,6 @@ def test_ol_does_not_block_xcoms(self, tmp_path):
dag = dagbag.dags.get("test_dag_xcom_openlineage")
task = dag.get_task("push_and_pull")
current_time = timezone.utcnow()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
logical_date=current_time,
@@ -293,7 +288,8 @@ def test_ol_does_not_block_xcoms(self, tmp_path):
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=current_time,
- **triggered_by_kwargs,
+ run_after=current_time,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TaskInstance(task=task, run_id="test")
@@ -417,7 +413,6 @@ def test_on_kill(self):
)
dag = dagbag.dags.get("test_on_kill")
task = dag.get_task("task1")
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
logical_date=DEFAULT_DATE,
@@ -425,7 +420,8 @@ def test_on_kill(self):
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TaskInstance(task=task, run_id="test")
job = Job(dag_id=ti.dag_id)
@@ -473,7 +469,6 @@ def test_parsing_context(self):
)
dag = dagbag.dags.get("test_parsing_context")
task = dag.get_task("task1")
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
@@ -482,7 +477,8 @@ def test_parsing_context(self):
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = TaskInstance(task=task, run_id="test")
job = Job(dag_id=ti.dag_id)
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index c0607f9322265..c1912897b321e 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -28,13 +28,9 @@
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.timezone import convert_to_utc, datetime
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -51,7 +47,6 @@ def test_first_task_run_of_new_task(self):
ignore_first_depends_on_past set to True.
"""
dag = DAG("test_dag", schedule=timedelta(days=1), start_date=START_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
old_task = BaseOperator(
task_id="test_task",
dag=dag,
@@ -66,7 +61,8 @@ def test_first_task_run_of_new_task(self):
logical_date=old_task.start_date,
run_type=DagRunType.SCHEDULED,
data_interval=(old_task.start_date, old_task.start_date),
- **triggered_by_kwargs,
+ run_after=old_task.start_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
new_task = BaseOperator(
@@ -85,7 +81,8 @@ def test_first_task_run_of_new_task(self):
logical_date=logical_date,
run_type=DagRunType.SCHEDULED,
data_interval=(logical_date, logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
ti = dr.get_task_instance(new_task.task_id)
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 5b30595e4a305..c7400b1748536 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -42,12 +42,7 @@
)
from airflow.utils.state import State
from airflow.utils.timezone import utcnow
-from airflow.utils.types import DagRunType
-
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
pytestmark = pytest.mark.db_test
@@ -79,7 +74,6 @@ def test_utc_transformations(self):
dag = DAG(dag_id=dag_id, schedule=datetime.timedelta(days=1), start_date=start_date)
dag.clear()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
run = dag.create_dagrun(
run_id=iso_date,
run_type=DagRunType.MANUAL,
@@ -88,7 +82,8 @@ def test_utc_transformations(self):
start_date=start_date,
session=self.session,
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
assert logical_date == run.logical_date
@@ -112,7 +107,6 @@ def test_process_bind_param_naive(self):
start_date = datetime.datetime.now()
dag = DAG(dag_id=dag_id, start_date=start_date, schedule=datetime.timedelta(days=1))
dag.clear()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
with pytest.raises((ValueError, StatementError)):
dag.create_dagrun(
@@ -123,7 +117,8 @@ def test_process_bind_param_naive(self):
start_date=start_date,
session=self.session,
data_interval=dag.timetable.infer_manual_data_interval(run_after=start_date),
- **triggered_by_kwargs,
+ run_after=start_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
dag.clear()
diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py
index e99bc0bd08387..de393f38c3516 100644
--- a/tests/utils/test_state.py
+++ b/tests/utils/test_state.py
@@ -24,13 +24,9 @@
from airflow.models.dagrun import DagRun
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests.models import DEFAULT_DATE
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -42,7 +38,6 @@ def test_dagrun_state_enum_escape():
"""
with create_session() as session:
dag = DAG(dag_id="test_dagrun_state_enum_escape", schedule=timedelta(days=1), start_date=DEFAULT_DATE)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
@@ -55,7 +50,8 @@ def test_dagrun_state_enum_escape():
start_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
session=session,
- **triggered_by_kwargs,
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
query = session.query(
diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py
index b6bb3f3751d82..d258445041d48 100644
--- a/tests/www/views/test_views_dagrun.py
+++ b/tests/www/views/test_views_dagrun.py
@@ -23,7 +23,7 @@
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.www.views import DagRunModelView
from providers.fab.tests.provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import (
create_user,
@@ -32,16 +32,12 @@
)
from tests.www.views.test_views_tasks import _get_appbuilder_pk_string
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import (
check_content_in_response,
check_content_not_in_response,
client_with_login,
)
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
pytestmark = pytest.mark.db_test
@@ -141,7 +137,6 @@ def test_create_dagrun_permission_denied(session, client_dr_without_dag_run_crea
def running_dag_run(session):
dag = DagBag().get_dag("example_bash_operator")
logical_date = timezone.datetime(2016, 1, 9)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
state="running",
logical_date=logical_date,
@@ -149,7 +144,8 @@ def running_dag_run(session):
run_id="test_dag_runs_action",
run_type=DagRunType.MANUAL,
session=session,
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
session.add(dr)
tis = [
@@ -165,7 +161,6 @@ def running_dag_run(session):
def completed_dag_run_with_missing_task(session):
dag = DagBag().get_dag("example_bash_operator")
logical_date = timezone.datetime(2016, 1, 9)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
state="success",
logical_date=logical_date,
@@ -173,7 +168,8 @@ def completed_dag_run_with_missing_task(session):
run_id="test_dag_runs_action",
run_type=DagRunType.MANUAL,
session=session,
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
session.add(dr)
tis = [
@@ -321,7 +317,6 @@ def dag_run_with_all_done_task(session):
dag.sync_to_db()
logical_date = timezone.datetime(2016, 1, 9)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = dag.create_dagrun(
state="running",
logical_date=logical_date,
@@ -329,7 +324,8 @@ def dag_run_with_all_done_task(session):
run_id="test_dagrun_failed",
run_type=DagRunType.MANUAL,
session=session,
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
# Create task instances in various states to test the ALL_DONE trigger rule
diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py
index 02be0ebd3681c..54e13641f9bb9 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -24,19 +24,15 @@
from airflow.models import DagBag, Variable
from airflow.utils import timezone
from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.db import clear_db_runs, clear_db_variables, parse_and_sync_to_db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import (
_check_last_log,
_check_last_log_masked_variable,
check_content_in_response,
)
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
pytestmark = pytest.mark.db_test
EXAMPLE_DAG_DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
@@ -60,7 +56,6 @@ def xcom_dag(dagbag):
@pytest.fixture(autouse=True)
def dagruns(bash_dag, xcom_dag):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
bash_dagrun = bash_dag.create_dagrun(
run_id="test_bash",
run_type=DagRunType.SCHEDULED,
@@ -68,7 +63,8 @@ def dagruns(bash_dag, xcom_dag):
data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
+ run_after=EXAMPLE_DAG_DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
xcom_dagrun = xcom_dag.create_dagrun(
@@ -78,7 +74,8 @@ def dagruns(bash_dag, xcom_dag):
data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
+ run_after=EXAMPLE_DAG_DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
)
yield bash_dagrun, xcom_dagrun
diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py
index 8ae2c63b818e0..fcf119ba207a5 100644
--- a/tests/www/views/test_views_extra_links.py
+++ b/tests/www/views/test_views_extra_links.py
@@ -27,7 +27,7 @@
from airflow.models.dag import DAG
from airflow.utils import timezone
from airflow.utils.state import DagRunState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.compat import BaseOperatorLink
from tests_common.test_utils.db import clear_db_runs
@@ -36,10 +36,6 @@
EmptyExtraLinkTestOperator,
EmptyNoExtraLinkTestOperator,
)
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
pytestmark = pytest.mark.db_test
@@ -88,7 +84,6 @@ def dag():
@pytest.fixture(scope="module")
def create_dag_run(dag):
def _create_dag_run(*, logical_date, session):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
return dag.create_dagrun(
run_id=f"manual__{logical_date.isoformat()}",
state=DagRunState.RUNNING,
@@ -96,7 +91,8 @@ def _create_dag_run(*, logical_date, session):
data_interval=(logical_date, logical_date),
run_type=DagRunType.MANUAL,
session=session,
- **triggered_by_kwargs,
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
)
return _create_dag_run
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index ae264d3f63829..037c97d026404 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -39,18 +39,14 @@
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, TaskInstanceState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.www.app import create_app
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
from tests_common.test_utils.decorators import dont_initialize_flask_app_submodules
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import client_with_login
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
pytestmark = pytest.mark.db_test
DAG_ID = "dag_for_testing_log_view"
@@ -158,16 +154,16 @@ def dags(log_app, create_dummy_dag, testing_dag_bundle, session):
@pytest.fixture(autouse=True)
def tis(dags, session):
dag, dag_removed = dags
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dagrun = dag.create_dagrun(
run_id=f"scheduled__{DEFAULT_DATE.isoformat()}",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=DEFAULT_DATE,
state=DagRunState.RUNNING,
session=session,
- **triggered_by_kwargs,
)
(ti,) = dagrun.task_instances
ti.try_number = 1
@@ -177,10 +173,11 @@ def tis(dags, session):
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=DEFAULT_DATE,
state=DagRunState.RUNNING,
session=session,
- **triggered_by_kwargs,
)
(ti_removed_dag,) = dagrun_removed.task_instances
ti_removed_dag.try_number = 1
diff --git a/tests/www/views/test_views_rendered.py b/tests/www/views/test_views_rendered.py
index 5f4b8063d0075..7514814dad5a8 100644
--- a/tests/www/views/test_views_rendered.py
+++ b/tests/www/views/test_views_rendered.py
@@ -32,7 +32,7 @@
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, TaskInstanceState
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.compat import BashOperator, PythonOperator
from tests_common.test_utils.db import (
@@ -41,12 +41,8 @@
clear_rendered_ti_fields,
initial_db_init,
)
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import check_content_in_response, check_content_not_in_response
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
DEFAULT_DATE = timezone.datetime(2020, 3, 1)
pytestmark = pytest.mark.db_test
@@ -145,15 +141,15 @@ def _reset_db(dag, task1, task2, task3, task4, task_secret):
@pytest.fixture
def create_dag_run(dag, task1, task2, task3, task4, task_secret):
def _create_dag_run(*, logical_date, session):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag_run = dag.create_dagrun(
run_id="test",
state=DagRunState.RUNNING,
logical_date=logical_date,
data_interval=(logical_date, logical_date),
+ run_after=logical_date,
+ triggered_by=DagRunTriggeredByType.TEST,
run_type=DagRunType.SCHEDULED,
session=session,
- **triggered_by_kwargs,
)
ti1 = dag_run.get_task_instance(task1.task_id, session=session)
ti1.state = TaskInstanceState.SUCCESS
@@ -342,15 +338,15 @@ def test_rendered_task_detail_env_secret(patch_app, admin_client, request, env,
url = f"task?task_id=task1&dag_id=testdag&logical_date={date}"
with create_session() as session:
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
state=DagRunState.RUNNING,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
run_type=DagRunType.SCHEDULED,
session=session,
- **triggered_by_kwargs,
)
resp = admin_client.get(url, follow_redirects=True)
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 7cb0de9024cfb..31af5e5ac3063 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -39,7 +39,7 @@
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State
-from airflow.utils.types import DagRunType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from airflow.www.views import TaskInstanceModelView, _safe_parse_datetime
from providers.fab.tests.provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import (
create_user,
@@ -50,16 +50,12 @@
from tests_common.test_utils.compat import BashOperator
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs, clear_db_xcom
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.www import (
check_content_in_response,
check_content_not_in_response,
client_with_login,
)
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
pytestmark = pytest.mark.db_test
DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
@@ -79,24 +75,25 @@ def _reset_dagruns():
@pytest.fixture(autouse=True)
def _init_dagruns(app):
with time_machine.travel(DEFAULT_DATE, tick=False):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
app.dag_bag.get_dag("example_python_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
XCom.set(
key="return_value",
@@ -110,27 +107,30 @@ def _init_dagruns(app):
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
app.dag_bag.get_dag("latest_only").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
app.dag_bag.get_dag("example_task_group").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
yield
clear_db_runs()
@@ -395,15 +395,15 @@ def test_rendered_k8s_without_k8s(admin_client):
def test_tree_trigger_origin_tree_view(app, admin_client):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
url = "tree?dag_id=example_bash_operator"
@@ -414,15 +414,15 @@ def test_tree_trigger_origin_tree_view(app, admin_client):
def test_graph_trigger_origin_grid_view(app, admin_client):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
url = "/dags/example_bash_operator/graph"
@@ -433,15 +433,15 @@ def test_graph_trigger_origin_grid_view(app, admin_client):
def test_gantt_trigger_origin_grid_view(app, admin_client):
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
start_date=timezone.utcnow(),
state=State.RUNNING,
- **triggered_by_kwargs,
)
url = "/dags/example_bash_operator/gantt"
@@ -853,7 +853,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker):
):
EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
EmptyOperator(task_id="task_3")
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
run1 = dag_maker.create_dagrun(
run_id="run_1",
state=DagRunState.SUCCESS,
@@ -861,7 +860,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker):
logical_date=dag_maker.dag.start_date,
start_date=dag_maker.dag.start_date,
session=session,
- **triggered_by_kwargs,
)
run2 = dag_maker.create_dagrun(
@@ -871,7 +869,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker):
logical_date=dag_maker.dag.start_date.add(days=1),
start_date=dag_maker.dag.start_date.add(days=1),
session=session,
- **triggered_by_kwargs,
)
for run in (run1, run2):
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 904b05b946a76..66c271113a780 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -931,6 +931,8 @@ def create_dagrun(self, *, logical_date=None, **kwargs):
if AIRFLOW_V_3_0_PLUS:
kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
kwargs["logical_date"] = logical_date
+ kwargs.setdefault("dag_version", None)
+ kwargs.setdefault("run_after", data_interval[-1])
else:
kwargs.pop("dag_version", None)
kwargs.pop("triggered_by", None)