From 4a73e29c682dfe8c12445f2e95ab3205871880ec Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Jan 2025 12:47:50 +0800 Subject: [PATCH 1/5] Add run_after column to DagRun model --- .../0058_3_0_0_add_dagrun_run_after.py | 62 + airflow/models/dagrun.py | 6 + airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 1537 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- 6 files changed, 844 insertions(+), 769 deletions(-) create mode 100644 airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py diff --git a/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py b/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py new file mode 100644 index 0000000000000..7a0a7e59c7d74 --- /dev/null +++ b/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add DagRun run_after. + +Revision ID: 6a9e7a527a88 +Revises: 33b04e4bfa19 +Create Date: 2025-01-16 10:07:34.940948 +""" + +from __future__ import annotations + +import alembic.op as op +import sqlalchemy as sa + +from airflow.utils.sqlalchemy import UtcDateTime + +# Revision identifiers, used by Alembic. +revision = "6a9e7a527a88" +down_revision = "33b04e4bfa19" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Add DagRun run_after.""" + op.add_column("dag_run", sa.Column("run_after", UtcDateTime, nullable=True)) + + # run_after must be set to after the data interval ends. + # In very old runs where the data interval is not available, we'll just use + # the logical date. This is wrong (one interval too early), but good enough + # since those super old runs should have their data interval ended a long + # time ago anyway, and setting + op.execute("update dag_run set run_after = coalesce(data_interval_end, logical_date)") + + with op.batch_alter_table("dag_run") as batch_op: + batch_op.alter_column("run_after", existing_type=UtcDateTime, nullable=False) + batch_op.create_index("idx_dag_run_run_after", ["run_after"], unique=False) + + +def downgrade(): + """Remove DagRun run_after.""" + with op.batch_alter_table("dag_run") as batch_op: + batch_op.drop_index("idx_dag_run_run_after") + batch_op.drop_column("run_after") diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 727746b9b0333..f3876033dd46f 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -145,6 +145,8 @@ class DagRun(Base, LoggingMixin): # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) + # Earliest time when this DagRun can start running. + run_after = Column(UtcDateTime, nullable=False) # When a scheduler last attempted to schedule TIs for this DagRun last_scheduling_decision = Column(UtcDateTime) # Foreign key to LogTemplate. DagRun rows created prior to this column's @@ -180,6 +182,7 @@ class DagRun(Base, LoggingMixin): Index("dag_id_state", dag_id, _state), UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), Index("idx_dag_run_dag_id", dag_id), + Index("idx_dag_run_run_after", run_after), Index( "idx_dag_run_running_dags", "state", @@ -229,8 +232,10 @@ def __init__( self, dag_id: str | None = None, run_id: str | None = None, + *, queued_at: datetime | None | ArgNotSet = NOTSET, logical_date: datetime | None = None, + run_after: datetime | None = None, start_date: datetime | None = None, external_trigger: bool | None = None, conf: Any | None = None, @@ -253,6 +258,7 @@ def __init__( self.dag_id = dag_id self.run_id = run_id self.logical_date = logical_date + self.run_after = run_after self.start_date = start_date self.external_trigger = external_trigger self.conf = conf or {} diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 0dc4dbb1b61f4..0c518e6108e5f 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "33b04e4bfa19", + "3.0.0": "6a9e7a527a88", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index f2bf38552164b..e3606c3476353 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -0f041321ecbb5d059f7a9c85fd5cfba745732eea205dc2e4f5e3b6dd76a9468d \ No newline at end of file +232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index d20fa37b8ea66..7efa5c1512485 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -649,24 +649,24 @@ dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N + +0..N 1 @@ -709,695 +709,695 @@ task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N + +0..N {0,1} task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 @@ -1792,115 +1792,120 @@ dag_version--task_instance - -0..N + +0..N {0,1} dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N + +0..N {0,1} @@ -2000,121 +2005,121 @@ dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 @@ -2145,9 +2150,9 @@ log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} @@ -2211,16 +2216,16 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 633c2338f4064..c39bcfb3a55f7 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``33b04e4bfa19`` (head) | ``8ea135928435`` | ``3.0.0`` | add new task_instance field scheduled_dttm. | +| ``6a9e7a527a88`` (head) | ``33b04e4bfa19`` | ``3.0.0`` | Add DagRun run_after. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``33b04e4bfa19`` | ``8ea135928435`` | ``3.0.0`` | add new task_instance field scheduled_dttm. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``8ea135928435`` | ``e39a26ac59f6`` | ``3.0.0`` | Add relative fileloc column. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From eba318b5aa4d7ac71f36b36b72cb2f335bf57d60 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Jan 2025 15:16:44 +0800 Subject: [PATCH 2/5] Set run_after when creating DAG run Depending on the run type, the value is set to: * BACKFILL_JOB: DagRunInfo.run_after * SCHEDULED: DagModel.next_run_create_after (this is the same as the backfill variant, only calculated pre-emptively and stored in db) * MANUAL: data_interval.end * ASSET_TRIGGERED: Creation time of the last triggering asset event --- airflow/api/common/trigger_dag.py | 1 + airflow/api_connexion/endpoints/dag_run_endpoint.py | 1 + airflow/api_fastapi/core_api/routes/public/dag_run.py | 1 + .../execution_api/datamodels/taskinstance.py | 1 + .../execution_api/routes/task_instances.py | 1 + airflow/cli/commands/remote_commands/task_command.py | 8 ++++++-- airflow/jobs/scheduler_job_runner.py | 2 ++ airflow/models/backfill.py | 1 + airflow/models/baseoperator.py | 1 + airflow/models/dag.py | 11 +++++++++-- airflow/www/views.py | 1 + dev/perf/scheduler_dag_execution_timing.py | 1 + tests/api_fastapi/common/test_exceptions.py | 6 +++--- .../execution_api/routes/test_task_instances.py | 1 + 14 files changed, 30 insertions(+), 7 deletions(-) diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index d04a790c02f88..08ee1726ccc22 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -102,6 +102,7 @@ def _trigger_dag( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, conf=run_conf, run_type=DagRunType.MANUAL, triggered_by=triggered_by, diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 374a0205c56d7..8243a30d54d24 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -354,6 +354,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, conf=post_body.get("conf"), run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 9f5bc1a5d484d..80db78d87f512 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -377,6 +377,7 @@ def trigger_dag_run( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, conf=body.conf, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.REST_API, diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 675a76d431392..c3ec79c9ddd11 100644 --- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -219,6 +219,7 @@ class DagRun(StrictBaseModel): logical_date: UtcDateTime data_interval_start: UtcDateTime | None data_interval_end: UtcDateTime | None + run_after: UtcDateTime start_date: UtcDateTime end_date: UtcDateTime | None run_type: DagRunType diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py index 155f96f861ab5..3cc70dac9c5b5 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -144,6 +144,7 @@ def ti_run( DR.dag_id, DR.data_interval_start, DR.data_interval_end, + DR.run_after, DR.start_date, DR.end_date, DR.run_type, diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index 011f629d13f56..ccae29ededd09 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -171,22 +171,26 @@ def _get_dag_run( dag_run_logical_date = pendulum.instance(timezone.utcnow()) if create_if_necessary == "memory": + data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date) dag_run = DagRun( dag_id=dag.dag_id, run_id=logical_date_or_run_id, run_type=DagRunType.MANUAL, external_trigger=True, logical_date=dag_run_logical_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), + data_interval=data_interval, + run_after=data_interval.end, triggered_by=DagRunTriggeredByType.CLI, state=DagRunState.RUNNING, ) return dag_run, True elif create_if_necessary == "db": + data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date) dag_run = dag.create_dagrun( run_id=_generate_temporary_run_id(), logical_date=dag_run_logical_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), + data_interval=data_interval, + run_after=data_interval.end, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.CLI, dag_version=None, diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index dd7c6f1f65550..1e98dbf2aba29 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1284,6 +1284,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - ), logical_date=dag_model.next_dagrun, data_interval=data_interval, + run_after=dag_model.next_dagrun_create_after, run_type=DagRunType.SCHEDULED, triggered_by=DagRunTriggeredByType.TIMETABLE, dag_version=latest_dag_version, @@ -1395,6 +1396,7 @@ def _create_dag_runs_asset_triggered( ), logical_date=logical_date, data_interval=data_interval, + run_after=max(logical_dates.values()), run_type=DagRunType.ASSET_TRIGGERED, triggered_by=DagRunTriggeredByType.ASSET, dag_version=latest_dag_version, diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 1564c4feb59a5..a2347e76fdc6e 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -296,6 +296,7 @@ def _create_backfill_dag_run( ), logical_date=info.logical_date, data_interval=info.data_interval, + run_after=info.run_after, conf=dag_run_conf, run_type=DagRunType.BACKFILL_JOB, triggered_by=DagRunTriggeredByType.BACKFILL, diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index e84994c6d04e7..425cbaca68880 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -632,6 +632,7 @@ def run( run_type=DagRunType.MANUAL, logical_date=info.logical_date, data_interval=info.data_interval, + run_after=info.run_after, triggered_by=DagRunTriggeredByType.TEST, state=DagRunState.RUNNING, ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a0f6e901dc3e9..8fae1604ed761 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -250,6 +250,7 @@ def _create_orm_dagrun( run_id: str, logical_date: datetime | None, data_interval: DataInterval | None, + run_after: datetime, start_date: datetime | None, external_trigger: bool, conf: Any, @@ -266,6 +267,7 @@ def _create_orm_dagrun( run_id=run_id, logical_date=logical_date, start_date=start_date, + run_after=run_after, external_trigger=external_trigger, conf=conf, state=state, @@ -1634,11 +1636,12 @@ def add_logger_if_needed(ti: TaskInstance): dag=scheduler_dag, start_date=logical_date, logical_date=logical_date, + data_interval=data_interval, + run_after=data_interval.end, run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date), session=session, conf=run_conf, triggered_by=DagRunTriggeredByType.TEST, - data_interval=data_interval, ) tasks = self.task_dict @@ -1721,6 +1724,7 @@ def create_dagrun( run_id: str, logical_date: datetime, data_interval: tuple[datetime, datetime], + run_after: datetime, conf: dict | None = None, run_type: DagRunType, triggered_by: DagRunTriggeredByType, @@ -1791,6 +1795,8 @@ def create_dagrun( dag=self, run_id=run_id, logical_date=logical_date, + data_interval=data_interval, + run_after=timezone.coerce_datetime(run_after), start_date=timezone.coerce_datetime(start_date), external_trigger=external_trigger, conf=conf, @@ -1799,7 +1805,6 @@ def create_dagrun( dag_version=dag_version, creating_job_id=creating_job_id, backfill_id=backfill_id, - data_interval=data_interval, triggered_by=triggered_by, session=session, ) @@ -2439,6 +2444,7 @@ def _get_or_create_dagrun( run_id: str, logical_date: datetime, data_interval: tuple[datetime, datetime], + run_after: datetime, conf: dict | None, triggered_by: DagRunTriggeredByType, start_date: datetime, @@ -2468,6 +2474,7 @@ def _get_or_create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=run_after, conf=conf, run_type=DagRunType.MANUAL, state=DagRunState.RUNNING, diff --git a/airflow/www/views.py b/airflow/www/views.py index 88e504c75822f..4ce5c6564619f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2237,6 +2237,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, conf=run_conf, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.UI, diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index 558b4eba4980c..4e056b3faf1e9 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -173,6 +173,7 @@ def create_dag_runs(dag, num_runs, session): run_id=f"{id_prefix}{logical_date.isoformat()}", logical_date=logical_date, data_interval=(logical_date, logical_date), + run_after=logical_date, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.TEST, external_trigger=False, diff --git a/tests/api_fastapi/common/test_exceptions.py b/tests/api_fastapi/common/test_exceptions.py index 6751aff20c725..d75e3bbe23bdf 100644 --- a/tests/api_fastapi/common/test_exceptions.py +++ b/tests/api_fastapi/common/test_exceptions.py @@ -186,7 +186,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?)", "orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id", }, ), @@ -194,7 +194,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s)", "orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")", }, ), @@ -202,7 +202,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", - "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(external_trigger)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id", + "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id, bundle_version) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(external_trigger)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(run_after)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(dag_version_id)s, %(bundle_version)s) RETURNING dag_run.id", "orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n', }, ), diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py b/tests/api_fastapi/execution_api/routes/test_task_instances.py index 8a7b21699f25e..c1a131e273605 100644 --- a/tests/api_fastapi/execution_api/routes/test_task_instances.py +++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py @@ -99,6 +99,7 @@ def test_ti_run_state_to_running(self, client, session, create_task_instance, ti "logical_date": instant_str, "data_interval_start": instant.subtract(days=1).to_iso8601_string(), "data_interval_end": instant_str, + "run_after": instant_str, "start_date": instant_str, "end_date": None, "external_trigger": False, From 5d4059409e261a0c336776d509759efeb93bfb7c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Jan 2025 18:05:46 +0800 Subject: [PATCH 3/5] Pass run_after to create_dagrun in tests --- .../common/sql/operators/test_sql.py | 7 ++ .../fab/www/views/test_views_acl.py | 13 ++- .../openlineage/plugins/test_execution.py | 2 + .../openlineage/plugins/test_listener.py | 3 + .../openlineage/plugins/test_utils.py | 1 + .../standard/decorators/test_python.py | 10 ++- .../log_handlers/test_log_handlers.py | 1 + .../endpoints/test_extra_link_endpoint.py | 13 ++- .../routes/public/test_extra_links.py | 5 +- .../remote_commands/test_task_command.py | 66 ++++----------- tests/dag_processing/test_processor.py | 1 + tests/jobs/test_local_task_job.py | 55 ++++++------ tests/jobs/test_scheduler_job.py | 38 ++++----- tests/models/test_cleartasks.py | 10 +-- tests/models/test_dag.py | 84 +++++++++---------- tests/models/test_dagrun.py | 18 ++-- tests/models/test_taskinstance.py | 29 +++---- tests/operators/test_trigger_dagrun.py | 4 + tests/sensors/test_external_task_sensor.py | 1 + tests/task/test_standard_task_runner.py | 26 +++--- tests/ti_deps/deps/test_prev_dagrun_dep.py | 13 ++- tests/utils/test_sqlalchemy.py | 15 ++-- tests/utils/test_state.py | 10 +-- tests/www/views/test_views_dagrun.py | 18 ++-- tests/www/views/test_views_decorators.py | 13 ++- tests/www/views/test_views_extra_links.py | 10 +-- tests/www/views/test_views_log.py | 13 ++- tests/www/views/test_views_rendered.py | 14 ++-- tests/www/views/test_views_tasks.py | 37 ++++---- tests_common/pytest_plugin.py | 2 + 30 files changed, 229 insertions(+), 303 deletions(-) diff --git a/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py b/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py index b0eedb52c47c2..0a6db5665deb3 100644 --- a/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py +++ b/providers/common/sql/tests/provider_tests/common/sql/operators/test_sql.py @@ -1163,6 +1163,7 @@ def test_branch_single_value_with_dag_run(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1213,6 +1214,7 @@ def test_branch_true_with_dag_run(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1263,6 +1265,7 @@ def test_branch_false_with_dag_run(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1314,6 +1317,7 @@ def test_branch_list_with_dag_run(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1362,6 +1366,7 @@ def test_invalid_query_result_with_dag_run(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1401,6 +1406,7 @@ def test_with_skip_in_branch_downstream_dependencies(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -1449,6 +1455,7 @@ def test_with_skip_in_branch_downstream_dependencies2(self, mock_get_db_hook): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: diff --git a/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py b/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py index 290afe7c737cf..3ffc5918baa7d 100644 --- a/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py +++ b/providers/fab/tests/provider_tests/fab/www/views/test_views_acl.py @@ -28,22 +28,18 @@ from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.www.views import FILTER_STATUS_COOKIE from provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import create_user_scope from tests_common.test_utils.db import clear_db_runs from tests_common.test_utils.permissions import _resource_name -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import ( check_content_in_response, check_content_not_in_response, client_with_login, ) -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test NEXT_YEAR = datetime.datetime.now().year + 1 @@ -148,7 +144,6 @@ def _reset_dagruns(): @pytest.fixture(autouse=True) def _init_dagruns(acl_app, _reset_dagruns): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} acl_app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id=DEFAULT_RUN_ID, run_type=DagRunType.SCHEDULED, @@ -156,7 +151,8 @@ def _init_dagruns(acl_app, _reset_dagruns): data_interval=(DEFAULT_DATE, DEFAULT_DATE), start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) acl_app.dag_bag.get_dag("example_python_operator").create_dagrun( run_id=DEFAULT_RUN_ID, @@ -165,7 +161,8 @@ def _init_dagruns(acl_app, _reset_dagruns): start_date=timezone.utcnow(), data_interval=(DEFAULT_DATE, DEFAULT_DATE), state=State.RUNNING, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) yield clear_db_runs() diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py index 6eb84a5d4012d..79ee2ae42179e 100644 --- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py +++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_execution.py @@ -101,6 +101,7 @@ def setup_job(self, task_name, run_id): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: @@ -207,6 +208,7 @@ def test_success_overtime_kills_tasks(self): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py index 25d9c24e6e519..f8da7452831a0 100644 --- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_listener.py @@ -93,6 +93,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock): dagrun_kwargs = { "dag_version": None, "logical_date": date, + "run_after": date, "triggered_by": types.DagRunTriggeredByType.TEST, } else: @@ -179,6 +180,7 @@ def sample_callable(**kwargs): dagrun_kwargs: dict = { "dag_version": None, "logical_date": date, + "run_after": date, "triggered_by": types.DagRunTriggeredByType.TEST, } else: @@ -710,6 +712,7 @@ def simple_callable(**kwargs): dagrun_kwargs = { "dag_version": None, "logical_date": date, + "run_after": date, "triggered_by": types.DagRunTriggeredByType.TEST, } else: diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py index 36b8057cf01f6..59fbc8605ad6e 100644 --- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py +++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_utils.py @@ -105,6 +105,7 @@ def test_get_dagrun_start_end(dag_maker): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs = { "logical_date": data_interval.start, + "run_after": data_interval.end, "triggered_by": DagRunTriggeredByType.TEST, } else: diff --git a/providers/standard/tests/provider_tests/standard/decorators/test_python.py b/providers/standard/tests/provider_tests/standard/decorators/test_python.py index 64914e3d149f8..84a58faae72e2 100644 --- a/providers/standard/tests/provider_tests/standard/decorators/test_python.py +++ b/providers/standard/tests/provider_tests/standard/decorators/test_python.py @@ -444,8 +444,9 @@ def return_dict(number: int): with self.dag_non_serialized: ret = return_dict(test_number) - triggered_by_kwargs = ( + v3_kwargs = ( { + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, "logical_date": DEFAULT_DATE, } @@ -462,7 +463,7 @@ def return_dict(number: int): data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval( run_after=DEFAULT_DATE ), - **triggered_by_kwargs, + **v3_kwargs, ) ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) @@ -517,8 +518,9 @@ def add_num(number: int, num2: int = 2): bigger_number = add_2(test_number) ret = add_num(bigger_number, XComArg(bigger_number.operator)) - triggered_by_kwargs = ( + v3_kwargs = ( { + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, "logical_date": DEFAULT_DATE, } @@ -535,7 +537,7 @@ def add_num(number: int, num2: int = 2): data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval( run_after=DEFAULT_DATE ), - **triggered_by_kwargs, + **v3_kwargs, ) bigger_number.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py index d89fbdf6edb15..cc63b8b15b310 100644 --- a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py +++ b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py @@ -129,6 +129,7 @@ def task_callable(ti): if AIRFLOW_V_3_0_PLUS: dagrun_kwargs: dict = { "logical_date": DEFAULT_DATE, + "run_after": DEFAULT_DATE, "triggered_by": DagRunTriggeredByType.TEST, } else: diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index e4aa7895ac662..d9b6e1d45ec0c 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -30,17 +30,13 @@ from airflow.timetables.base import DataInterval from airflow.utils import timezone from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.api_connexion_utils import create_user, delete_user from tests_common.test_utils.compat import BaseOperatorLink from tests_common.test_utils.db import clear_db_runs, clear_db_xcom from tests_common.test_utils.mock_operators import CustomOperator from tests_common.test_utils.mock_plugins import mock_plugin_manager -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -79,15 +75,16 @@ def setup_attrs(self, configured_app, session) -> None: self.app.dag_bag.dags = {self.dag.dag_id: self.dag} self.app.dag_bag.sync_to_db("dags-folder", None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + data_interval = DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)) self.dag.create_dagrun( run_id="TEST_DAG_RUN_ID", logical_date=self.default_time, run_type=DagRunType.MANUAL, state=DagRunState.SUCCESS, session=session, - data_interval=DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)), - **triggered_by_kwargs, + data_interval=data_interval, + run_after=data_interval.end, + triggered_by=DagRunTriggeredByType.TEST, ) session.flush() diff --git a/tests/api_fastapi/core_api/routes/public/test_extra_links.py b/tests/api_fastapi/core_api/routes/public/test_extra_links.py index 89aef555bbe20..b3afd174db455 100644 --- a/tests/api_fastapi/core_api/routes/public/test_extra_links.py +++ b/tests/api_fastapi/core_api/routes/public/test_extra_links.py @@ -74,15 +74,14 @@ def setup(self, test_client, session=None) -> None: test_client.app.state.dag_bag = dag_bag dag_bag.sync_to_db("dags-folder", None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} - self.dag.create_dagrun( run_id=self.dag_run_id, logical_date=self.default_time, run_type=DagRunType.MANUAL, state=DagRunState.SUCCESS, data_interval=(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)), - **triggered_by_kwargs, + run_after=timezone.datetime(2020, 1, 2), + triggered_by=DagRunTriggeredByType.TEST, ) def teardown_method(self) -> None: diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index 2d6f699c1e788..166f152b9eaee 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -50,14 +50,10 @@ from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_pools, clear_db_runs, parse_and_sync_to_db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -104,21 +100,15 @@ def setup_class(cls): cls.dagbag = DagBag(read_dags_from_db=True) cls.dag = cls.dagbag.get_dag(cls.dag_id) data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE) - v3_kwargs = ( - { - "dag_version": None, - "triggered_by": DagRunTriggeredByType.TEST, - } - if AIRFLOW_V_3_0_PLUS - else {} - ) cls.dag_run = cls.dag.create_dagrun( state=State.RUNNING, run_id=cls.run_id, run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, data_interval=data_interval, - **v3_kwargs, + run_after=DEFAULT_DATE, + dag_version=None, + triggered_by=DagRunTriggeredByType.TEST, ) @classmethod @@ -175,22 +165,16 @@ def test_cli_test_different_path(self, session, tmp_path): logical_date = pendulum.now("UTC") data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - v3_kwargs = ( - { - "dag_version": None, - "triggered_by": DagRunTriggeredByType.TEST, - } - if AIRFLOW_V_3_0_PLUS - else {} - ) dag.create_dagrun( state=State.RUNNING, run_id="abc123", run_type=DagRunType.MANUAL, logical_date=logical_date, data_interval=data_interval, + run_after=logical_date, + dag_version=None, + triggered_by=DagRunTriggeredByType.TEST, session=session, - **v3_kwargs, ) session.commit() @@ -647,22 +631,16 @@ def test_task_states_for_dag_run(self): default_date2 = timezone.datetime(2016, 1, 9) dag2.clear() data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2) - v3_kwargs = ( - { - "dag_version": None, - "triggered_by": DagRunTriggeredByType.CLI, - } - if AIRFLOW_V_3_0_PLUS - else {} - ) dagrun = dag2.create_dagrun( run_id="test", state=State.RUNNING, logical_date=default_date2, data_interval=data_interval, + run_after=default_date2, run_type=DagRunType.MANUAL, external_trigger=True, - **v3_kwargs, + dag_version=None, + triggered_by=DagRunTriggeredByType.CLI, ) ti2 = TaskInstance(task2, run_id=dagrun.run_id) ti2.set_state(State.SUCCESS) @@ -736,22 +714,16 @@ def setup_method(self) -> None: dag = DagBag().get_dag(self.dag_id) data_interval = dag.timetable.infer_manual_data_interval(run_after=self.logical_date) - v3_kwargs = ( - { - "dag_version": None, - "triggered_by": DagRunTriggeredByType.TEST, - } - if AIRFLOW_V_3_0_PLUS - else {} - ) self.dr = dag.create_dagrun( run_id=self.run_id, logical_date=self.logical_date, data_interval=data_interval, + run_after=self.logical_date, start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL, - **v3_kwargs, + dag_version=None, + triggered_by=DagRunTriggeredByType.TEST, ) self.tis = self.dr.get_task_instances() assert len(self.tis) == 1 @@ -1049,22 +1021,16 @@ def test_context_with_run(): dag = DagBag().get_dag(dag_id) data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - v3_kwargs = ( - { - "dag_version": None, - "triggered_by": DagRunTriggeredByType.TEST, - } - if AIRFLOW_V_3_0_PLUS - else {} - ) dag.create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=logical_date, start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL, - **v3_kwargs, + dag_version=None, + triggered_by=DagRunTriggeredByType.TEST, ) with conf_vars({("core", "dags_folder"): dag_path}): task_command.task_run(parser.parse_args(task_args)) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 2ba95f15049ed..9844186be1df5 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -104,6 +104,7 @@ def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, data_interval=dag.infer_automated_data_interval(DEFAULT_DATE), + run_after=DEFAULT_DATE, triggered_by=DagRunTriggeredByType.TEST, session=session, ) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index d229abfe34f89..c86599d555c54 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -49,16 +49,12 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timeout import timeout -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils import db from tests_common.test_utils.asserts import assert_queries_count from tests_common.test_utils.config import conf_vars from tests_common.test_utils.mock_executor import MockExecutor -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -307,7 +303,6 @@ def test_heartbeat_failed_fast(self): dag = self.dagbag.get_dag(dag_id) task = dag.get_task(task_id) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test_heartbeat_failed_fast_run", run_type=DagRunType.MANUAL, @@ -316,7 +311,8 @@ def test_heartbeat_failed_fast(self): start_date=DEFAULT_DATE, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = dr.task_instances[0] @@ -346,7 +342,6 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session): """ dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test", state=State.RUNNING, @@ -354,7 +349,8 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="test_mark_success_no_kill") @@ -379,8 +375,6 @@ def test_localtaskjob_double_trigger(self): session = settings.Session() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag.clear() dr = dag.create_dagrun( run_id="test", @@ -390,7 +384,8 @@ def test_localtaskjob_double_trigger(self): start_date=DEFAULT_DATE, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = dr.get_task_instance(task_id=task.task_id, session=session) @@ -485,7 +480,6 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag): """ dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -494,7 +488,8 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="test_mark_failure_externally") ti = dr.get_task_instance(task.task_id) @@ -521,7 +516,6 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag): dag = get_test_dag("test_mark_state") dag.dagrun_timeout = datetime.timedelta(microseconds=1) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -531,7 +525,8 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="test_mark_skipped_externally") ti = dr.get_task_instance(task.task_id) @@ -557,7 +552,6 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file)) dag = get_test_dag("test_on_failure_callback") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -566,7 +560,8 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="test_on_failure_callback_task") ti = TaskInstance(task=task, run_id=dr.run_id) @@ -594,7 +589,6 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag): """ dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -603,7 +597,8 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="test_mark_success_no_kill") @@ -631,7 +626,6 @@ def test_success_listeners_executed(self, caplog, get_test_dag): dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -640,7 +634,8 @@ def test_success_listeners_executed(self, caplog, get_test_dag): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="sleep_execution") @@ -671,7 +666,6 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag): dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -680,7 +674,8 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag): run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="sleep_execution") @@ -711,7 +706,6 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl dag = get_test_dag("test_mark_state") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( run_id="test", @@ -720,7 +714,8 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="slow_execution") @@ -755,7 +750,6 @@ def test_process_os_signal_calls_on_failure_callback( monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file)) dag = get_test_dag("test_on_failure_callback") data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dag.create_dagrun( run_id="test", @@ -764,7 +758,8 @@ def test_process_os_signal_calls_on_failure_callback( run_type=DagRunType.SCHEDULED, session=session, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) task = dag.get_task(task_id="bash_sleep") dag_run = dag.get_last_dagrun() @@ -1016,14 +1011,14 @@ def task_function(): logger.addHandler(tmpfile_handler) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( run_type=DagRunType.MANUAL, state=State.RUNNING, run_id=run_id, logical_date=logical_date, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_LOGICAL_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TaskInstance(task=task, run_id=dag_run.run_id) ti.refresh_from_db() diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 62441d646cd2a..faf8fa73284af 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -69,7 +69,7 @@ from airflow.utils import timezone from airflow.utils.session import create_session, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests.listeners import dag_listener from tests.listeners.test_listeners import get_listener_manager @@ -90,10 +90,6 @@ ) from tests_common.test_utils.mock_executor import MockExecutor from tests_common.test_utils.mock_operators import CustomOperator -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -133,8 +129,6 @@ def _loader_mock(mock_executors): @pytest.fixture def create_dagrun(session): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - def _create_dagrun( dag: DAG, *, @@ -153,10 +147,11 @@ def _create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, run_type=run_type, state=state, start_date=start_date, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) return _create_dagrun @@ -2870,13 +2865,15 @@ def test_scheduler_start_date(self, testing_dag_bundle): # because it would take the most recent run and start from there # That behavior still exists, but now it will only do so if after the # start date + data_interval_end = DEFAULT_DATE + timedelta(days=1) dag.create_dagrun( state="success", triggered_by=DagRunTriggeredByType.TIMETABLE, run_id="abc123", logical_date=DEFAULT_DATE, run_type=DagRunType.BACKFILL_JOB, - data_interval=DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)), + data_interval=DataInterval(DEFAULT_DATE, data_interval_end), + run_after=data_interval_end, ) # one task "ran" assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1 @@ -3061,7 +3058,6 @@ def test_scheduler_keeps_scheduling_pool_full(self, dag_maker, mock_executor): def _create_dagruns(dag: DAG): next_info = dag.next_dagrun_info(None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} assert next_info is not None for i in range(30): yield dag.create_dagrun( @@ -3069,8 +3065,9 @@ def _create_dagruns(dag: DAG): run_type=DagRunType.SCHEDULED, logical_date=next_info.logical_date, data_interval=next_info.data_interval, + run_after=next_info.run_after, state=DagRunState.RUNNING, - **triggered_by_kwargs, # type: ignore + triggered_by=DagRunTriggeredByType.TEST, ) next_info = dag.next_dagrun_info(next_info.data_interval) if next_info is None: @@ -4072,7 +4069,6 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak # Trigger the Dag externally data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test", state=DagRunState.RUNNING, @@ -4081,7 +4077,8 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak session=session, external_trigger=True, data_interval=data_interval, - **triggered_by_kwargs, + run_after=DEFAULT_LOGICAL_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) assert dr is not None # Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file @@ -4158,7 +4155,6 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, session): ) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_version = DagVersion.get_latest_version(dag.dag_id) run1 = dag.create_dagrun( run_id="test1", @@ -4168,22 +4164,25 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, session): start_date=timezone.utcnow() - timedelta(seconds=2), session=session, data_interval=data_interval, + run_after=DEFAULT_LOGICAL_DATE, dag_version=dag_version, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) run1_ti = run1.get_task_instance(task1.task_id, session) run1_ti.state = State.RUNNING + logical_date_2 = DEFAULT_DATE + timedelta(seconds=10) run2 = dag.create_dagrun( run_id="test2", run_type=DagRunType.SCHEDULED, - logical_date=DEFAULT_DATE + timedelta(seconds=10), + logical_date=logical_date_2, state=State.QUEUED, session=session, data_interval=data_interval, + run_after=logical_date_2, dag_version=dag_version, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) scheduler_job = Job(executor=self.null_exec) @@ -5720,15 +5719,16 @@ def test_mapped_dag(self, dag_id, session, testing_dag_bundle): logical_date = timezone.coerce_datetime(timezone.utcnow() - datetime.timedelta(days=2)) data_interval = dag.infer_automated_data_interval(logical_date) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id=f"{dag_id}_1", run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), state=State.RUNNING, session=session, + logical_date=logical_date, data_interval=data_interval, - **triggered_by_kwargs, + run_after=data_interval, + triggered_by=DagRunTriggeredByType.TEST, ) executor = SequentialExecutor() diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index 092db38ac0b20..a469e84406ec0 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -33,14 +33,10 @@ from airflow.providers.standard.sensors.python import PythonSensor from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests.models import DEFAULT_DATE from tests_common.test_utils import db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -642,7 +638,6 @@ def test_dags_clear(self): ) task = EmptyOperator(task_id=f"test_task_clear_{i}", owner="test", dag=dag) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id=f"scheduled_{i}", logical_date=DEFAULT_DATE, @@ -650,7 +645,8 @@ def test_dags_clear(self): run_type=DagRunType.SCHEDULED, session=session, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = dr.task_instances[0] ti.task = task diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 78fffd45fd098..fde7e9eafa62a 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -87,7 +87,7 @@ from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.timezone import datetime as datetime_tz from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.utils.weight_rule import WeightRule from tests.models import DEFAULT_DATE @@ -106,13 +106,8 @@ from tests_common.test_utils.mapping import expand_mapped_task from tests_common.test_utils.mock_plugins import mock_plugin_manager from tests_common.test_utils.timetables import cron_timetable, delta_timetable -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType if TYPE_CHECKING: - from pendulum import DateTime from sqlalchemy.orm import Session pytestmark = pytest.mark.db_test @@ -150,13 +145,15 @@ def test_dags_bundle(configure_testing_dag_bundle): def _create_dagrun( dag: DAG, *, - logical_date: DateTime, - data_interval: DataInterval, + logical_date: datetime.datetime, + data_interval: tuple[datetime.datetime, datetime.datetime], run_type: DagRunType, state: DagRunState = DagRunState.RUNNING, start_date: datetime.datetime | None = None, ) -> DagRun: - triggered_by_kwargs: dict = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + logical_date = timezone.coerce_datetime(logical_date) + if not isinstance(data_interval, DataInterval): + data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) run_id = dag.timetable.generate_run_id( run_type=run_type, logical_date=logical_date, # type: ignore @@ -166,10 +163,11 @@ def _create_dagrun( run_id=run_id, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, run_type=run_type, state=state, start_date=start_date, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) @@ -429,7 +427,6 @@ def test_get_task_instances_before(self): EmptyOperator(task_id=test_task_id, dag=test_dag) session = settings.Session() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED): dagrun = test_dag.create_dagrun( @@ -438,11 +435,11 @@ def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED): run_id=f"test_{delta_h}", logical_date=None, data_interval=None, + run_after=None, session=session, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) - dagrun.start_date = BASE_DATE + timedelta(hours=delta_h) - dagrun.logical_date = BASE_DATE + timedelta(hours=delta_h) + dagrun.start_date = dagrun.run_after = dagrun.logical_date = BASE_DATE + timedelta(hours=delta_h) return dagrun dr1 = dag_run_before(delta_h=-1, type=DagRunType.MANUAL) # H19 @@ -635,14 +632,14 @@ def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self): # Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none dag = DAG("dag_with_none_schedule_and_empty_start_date", schedule=None) dag.add_task(BaseOperator(task_id="task_without_start_date")) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun = dag.create_dagrun( run_id="test", state=State.RUNNING, run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) assert dagrun is not None @@ -832,7 +829,6 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state): assert model.next_dagrun == DEFAULT_DATE assert model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=1) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test", state=state, @@ -840,7 +836,8 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state): run_type=DagRunType.SCHEDULED, session=session, data_interval=(model.next_dagrun, model.next_dagrun), - **triggered_by_kwargs, + run_after=model.next_dagrun_create_after, + triggered_by=DagRunTriggeredByType.TEST, ) assert dr is not None DAG.bulk_write_to_db("testing", None, [dag]) @@ -1046,7 +1043,6 @@ def test_existing_dag_is_paused_config(self): def test_existing_dag_is_paused_after_limit(self, testing_dag_bundle): def add_failed_dag_run(dag, id, logical_date): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_v = DagVersion.get_latest_version(dag_id=dag.dag_id) dr = dag.create_dagrun( run_type=DagRunType.MANUAL, @@ -1054,8 +1050,9 @@ def add_failed_dag_run(dag, id, logical_date): logical_date=logical_date, state=State.FAILED, data_interval=(logical_date, logical_date), + run_after=logical_date, dag_version=dag_v, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) ti_op1 = dr.get_task_instance(task_id=op1.task_id, session=session) ti_op1.set_state(state=TaskInstanceState.FAILED, session=session) @@ -1117,7 +1114,6 @@ def test_schedule_dag_no_previous_runs(self): dag_id = "test_schedule_dag_no_previous_runs" dag = DAG(dag_id=dag_id, schedule=None) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( run_id="test", @@ -1125,7 +1121,8 @@ def test_schedule_dag_no_previous_runs(self): logical_date=TEST_DATE, state=State.RUNNING, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) assert dag_run is not None assert dag.dag_id == dag_run.dag_id @@ -1156,7 +1153,6 @@ def test_dag_handle_callback_crash(self, mock_stats): ) when = TEST_DATE dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=when)) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dag_run = dag.create_dagrun( @@ -1166,7 +1162,8 @@ def test_dag_handle_callback_crash(self, mock_stats): run_type=DagRunType.MANUAL, session=session, data_interval=(when, when), - **triggered_by_kwargs, + run_after=when, + triggered_by=DagRunTriggeredByType.TEST, ) # should not raise any exception @@ -1197,7 +1194,6 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session): task_removed = EmptyOperator(task_id="removed_task") with create_session() as session: - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( run_id="test", state=State.RUNNING, @@ -1205,7 +1201,8 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session): run_type=DagRunType.MANUAL, session=session, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) dag._remove_task(task_removed.task_id) tis = dag_run.get_task_instances(session=session) @@ -1421,15 +1418,15 @@ def test_description_from_timetable(self, timetable, expected_description): def test_create_dagrun_job_id_is_set(self): job_id = 42 dag = DAG(dag_id="test_create_dagrun_job_id_is_set", schedule=None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test_create_dagrun_job_id_is_set", logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, run_type=DagRunType.MANUAL, state=State.NONE, creating_job_id=job_id, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) assert dr.creating_job_id == job_id @@ -1547,6 +1544,7 @@ def consumer(value): logical_date=DEFAULT_DATE, session=session, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, triggered_by=DagRunTriggeredByType.TEST, ) # Get the (de)serialized MappedOperator @@ -1728,7 +1726,6 @@ def test_clear_dag( _ = EmptyOperator(task_id=task_id, dag=dag) session = settings.Session() # type: ignore - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun_1 = dag.create_dagrun( run_id="backfill", run_type=DagRunType.BACKFILL_JOB, @@ -1736,7 +1733,8 @@ def test_clear_dag( start_date=DEFAULT_DATE, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, # type: ignore + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) session.merge(dagrun_1) @@ -2043,7 +2041,6 @@ def test_validate_executor_field(self): def test_validate_params_on_trigger_dag(self): dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"): dag.create_dagrun( run_id="test_dagrun_missing_param", @@ -2051,7 +2048,8 @@ def test_validate_params_on_trigger_dag(self): state=State.RUNNING, logical_date=TEST_DATE, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) @@ -2065,7 +2063,8 @@ def test_validate_params_on_trigger_dag(self): logical_date=TEST_DATE, conf={"param1": None}, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) @@ -2076,7 +2075,8 @@ def test_validate_params_on_trigger_dag(self): logical_date=TEST_DATE, conf={"param1": "hello"}, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) def test_dag_owner_links(self): @@ -2505,7 +2505,6 @@ def teardown_method(self) -> None: @pytest.mark.parametrize("tasks_count", [3, 12]) def test_count_number_queries(self, tasks_count): dag = DAG("test_dagrun_query_count", schedule=None, start_date=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} for i in range(tasks_count): EmptyOperator(task_id=f"dummy_task_{i}", owner="test", dag=dag) with assert_queries_count(4): @@ -2515,7 +2514,8 @@ def test_count_number_queries(self, tasks_count): state=State.RUNNING, logical_date=TEST_DATE, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, + run_after=TEST_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) @@ -2584,15 +2584,15 @@ def return_num(num): dag = xcom_pass_to_op() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test", run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=self.DEFAULT_DATE, data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE), + run_after=self.DEFAULT_DATE, state=State.RUNNING, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) self.operator.run(start_date=self.DEFAULT_DATE, end_date=self.DEFAULT_DATE) @@ -2615,16 +2615,16 @@ def return_num(num): dag = xcom_pass_to_op() new_value = 52 - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test", run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=self.DEFAULT_DATE, data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE), + run_after=self.DEFAULT_DATE, state=State.RUNNING, conf={"value": new_value}, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) self.operator.run(start_date=self.DEFAULT_DATE, end_date=self.DEFAULT_DATE) @@ -3122,7 +3122,6 @@ def test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None: dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily") run_id = run_id_type.generate_run_id(DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with pytest.raises(ValueError) as ctx: dag.create_dagrun( @@ -3130,8 +3129,9 @@ def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagR run_id=run_id, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, state=DagRunState.QUEUED, - **triggered_by_kwargs, # type: ignore + triggered_by=DagRunTriggeredByType.TEST, ) assert str(ctx.value) == ( f"A manual DAG run cannot use ID {run_id!r} since it is reserved for {run_id_type.value} runs" diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index b5afa6301f4a6..12bccca74b815 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -46,16 +46,12 @@ from airflow.utils import timezone from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests.models import DEFAULT_DATE as _DEFAULT_DATE from tests_common.test_utils import db from tests_common.test_utils.config import conf_vars from tests_common.test_utils.mock_operators import MockOperator -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -103,7 +99,6 @@ def create_dag_run( ): now = timezone.utcnow() logical_date = pendulum.instance(logical_date or now) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} if is_backfill: run_type = DagRunType.BACKFILL_JOB data_interval = dag.infer_automated_data_interval(logical_date) @@ -119,10 +114,11 @@ def create_dag_run( run_type=run_type, logical_date=logical_date, data_interval=data_interval, + run_after=data_interval.end, start_date=now, state=state, external_trigger=False, - **triggered_by_kwargs, # type: ignore + triggered_by=DagRunTriggeredByType.TEST, ) if task_states is not None: @@ -839,7 +835,6 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): ) session.add(orm_dag) session.flush() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, @@ -850,9 +845,10 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): state=state, logical_date=DEFAULT_DATE, data_interval=dag.infer_automated_data_interval(DEFAULT_DATE), + run_after=DEFAULT_DATE, start_date=DEFAULT_DATE if state == DagRunState.RUNNING else None, session=session, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) if state == DagRunState.RUNNING: @@ -917,7 +913,6 @@ def test_emit_scheduling_delay(self, session, schedule, expected): orm_dag = DagModel(**orm_dag_kwargs) session.add(orm_dag) session.flush() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, @@ -928,9 +923,10 @@ def test_emit_scheduling_delay(self, session, schedule, expected): state=DagRunState.SUCCESS, logical_date=dag.start_date, data_interval=dag.infer_automated_data_interval(dag.start_date), + run_after=dag.start_date, start_date=dag.start_date, + triggered_by=DagRunTriggeredByType.TEST, session=session, - **triggered_by_kwargs, ) ti = dag_run.get_task_instance(dag_task.task_id, session) ti.set_state(TaskInstanceState.SUCCESS, session) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 628945ebf2aaa..ab43b61cb95e9 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -96,7 +96,7 @@ from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.task_group import TaskGroup from airflow.utils.task_instance_session import set_current_task_instance_session -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.utils.xcom import XCOM_RETURN_KEY from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER @@ -104,10 +104,6 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_connections, clear_db_runs from tests_common.test_utils.mock_operators import MockOperator -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = [pytest.mark.db_test] @@ -1745,14 +1741,14 @@ def test_xcom_pull_different_logical_date(self, create_task_instance): assert ti.xcom_pull(task_ids="test_xcom", key=key) == value ti.run() exec_date += datetime.timedelta(days=1) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = ti.task.dag.create_dagrun( run_id="test2", run_type=DagRunType.MANUAL, logical_date=exec_date, data_interval=(exec_date, exec_date), + run_after=exec_date, state=None, - **triggered_by_kwargs, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TI(task=ti.task, run_id=dr.run_id) ti.run() @@ -2027,7 +2023,6 @@ def test_get_num_running_task_instances(self, create_task_instance): ) logical_date = DEFAULT_DATE + datetime.timedelta(days=1) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = ti1.task.dag.create_dagrun( logical_date=logical_date, run_type=DagRunType.MANUAL, @@ -2035,7 +2030,8 @@ def test_get_num_running_task_instances(self, create_task_instance): run_id="2", session=session, data_interval=(logical_date, logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) assert ti1 in session ti2 = dr.task_instances[0] @@ -3210,7 +3206,6 @@ def test_get_previous_start_date_none(self, dag_maker): day_1 = DEFAULT_DATE day_2 = DEFAULT_DATE + datetime.timedelta(days=1) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} # Create a DagRun for day_1 and day_2. Calling ti_2.get_previous_start_date() # should return the start_date of ti_1 (which is None because ti_1 was not run). @@ -3219,7 +3214,6 @@ def test_get_previous_start_date_none(self, dag_maker): logical_date=day_1, state=State.RUNNING, run_type=DagRunType.MANUAL, - **triggered_by_kwargs, ) dagrun_2 = dag_maker.create_dagrun( @@ -3227,7 +3221,6 @@ def test_get_previous_start_date_none(self, dag_maker): state=State.RUNNING, run_type=DagRunType.MANUAL, data_interval=(day_1, day_2), - **triggered_by_kwargs, ) ti_1 = dagrun_1.get_task_instance(task.task_id) @@ -3254,7 +3247,6 @@ def test_context_triggering_asset_events(self, create_dummy_dag, session): session.commit() logical_date = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} # it's easier to fake a manual run here dag, task1 = create_dummy_dag( dag_id="test_triggering_asset_events", @@ -3271,7 +3263,8 @@ def test_context_triggering_asset_events(self, create_dummy_dag, session): state=DagRunState.RUNNING, session=session, data_interval=(logical_date, logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) ds1_event = AssetEvent(asset_id=1) ds2_event_1 = AssetEvent(asset_id=2) @@ -3523,7 +3516,6 @@ def test_handle_failure(self, create_dummy_dag, session=None): session=session, ) logical_date = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test2", run_type=DagRunType.MANUAL, @@ -3532,7 +3524,8 @@ def test_handle_failure(self, create_dummy_dag, session=None): start_date=logical_date - datetime.timedelta(hours=1), session=session, data_interval=(logical_date, logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) dr.set_state(DagRunState.FAILED) ti1 = dr.get_task_instance(task1.task_id, session=session) @@ -3672,7 +3665,6 @@ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None): fail_fast=True, ) logical_date = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test_ff", run_type=DagRunType.MANUAL, @@ -3680,7 +3672,8 @@ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None): state=DagRunState.RUNNING, session=session, data_interval=(logical_date, logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) dr.set_state(DagRunState.SUCCESS) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 74d8371e5d78e..d4ff1796a3442 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -192,6 +192,8 @@ def test_trigger_dagrun_twice(self, dag_maker): dag_run = DagRun( dag_id=TRIGGERED_DAG_ID, logical_date=utc_now, + data_interval=(utc_now, utc_now), + run_after=utc_now, state=State.SUCCESS, run_type="manual", run_id=run_id, @@ -230,6 +232,8 @@ def test_trigger_dagrun_with_scheduled_dag_run(self, dag_maker): dag_run = DagRun( dag_id=TRIGGERED_DAG_ID, logical_date=utc_now, + data_interval=(utc_now, utc_now), + run_after=utc_now, state=State.SUCCESS, run_type="scheduled", run_id=run_id, diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 8a9b810928c70..fc1b8a1315da2 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -1244,6 +1244,7 @@ def run_tasks( ), logical_date=logical_date, data_interval=data_interval, + run_after=logical_date, run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.TEST, dag_version=None, diff --git a/tests/task/test_standard_task_runner.py b/tests/task/test_standard_task_runner.py index 51c5f9f8fa3e8..e4560ba3d1d70 100644 --- a/tests/task/test_standard_task_runner.py +++ b/tests/task/test_standard_task_runner.py @@ -40,15 +40,11 @@ from airflow.utils.platform import getuser from airflow.utils.state import State from airflow.utils.timeout import timeout -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests.listeners import xcom_listener from tests.listeners.file_write_listener import FileWriteListener from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"] DEFAULT_DATE = timezone.datetime(2016, 1, 1) @@ -187,7 +183,6 @@ def test_notifies_about_start_and_stop(self, tmp_path, session): dag = dagbag.dags.get("test_example_bash_operator") task = dag.get_task("runme_1") current_time = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", logical_date=current_time, @@ -196,7 +191,8 @@ def test_notifies_about_start_and_stop(self, tmp_path, session): state=State.RUNNING, start_date=current_time, session=session, - **triggered_by_kwargs, + run_after=current_time, + triggered_by=DagRunTriggeredByType.TEST, ) session.commit() ti = TaskInstance(task=task, run_id="test") @@ -235,7 +231,6 @@ def test_notifies_about_fail(self, tmp_path): dag = dagbag.dags.get("test_failing_bash_operator") task = dag.get_task("failing_task") current_time = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", logical_date=current_time, @@ -243,7 +238,8 @@ def test_notifies_about_fail(self, tmp_path): run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=current_time, - **triggered_by_kwargs, + run_after=current_time, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TaskInstance(task=task, run_id="test") job = Job(dag_id=ti.dag_id) @@ -285,7 +281,6 @@ def test_ol_does_not_block_xcoms(self, tmp_path): dag = dagbag.dags.get("test_dag_xcom_openlineage") task = dag.get_task("push_and_pull") current_time = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", logical_date=current_time, @@ -293,7 +288,8 @@ def test_ol_does_not_block_xcoms(self, tmp_path): run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=current_time, - **triggered_by_kwargs, + run_after=current_time, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TaskInstance(task=task, run_id="test") @@ -417,7 +413,6 @@ def test_on_kill(self): ) dag = dagbag.dags.get("test_on_kill") task = dag.get_task("task1") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", logical_date=DEFAULT_DATE, @@ -425,7 +420,8 @@ def test_on_kill(self): run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TaskInstance(task=task, run_id="test") job = Job(dag_id=ti.dag_id) @@ -473,7 +469,6 @@ def test_parsing_context(self): ) dag = dagbag.dags.get("test_parsing_context") task = dag.get_task("task1") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", @@ -482,7 +477,8 @@ def test_parsing_context(self): run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) ti = TaskInstance(task=task, run_id="test") job = Job(dag_id=ti.dag_id) diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py index c0607f9322265..c1912897b321e 100644 --- a/tests/ti_deps/deps/test_prev_dagrun_dep.py +++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py @@ -28,13 +28,9 @@ from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.timezone import convert_to_utc, datetime -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -51,7 +47,6 @@ def test_first_task_run_of_new_task(self): ignore_first_depends_on_past set to True. """ dag = DAG("test_dag", schedule=timedelta(days=1), start_date=START_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} old_task = BaseOperator( task_id="test_task", dag=dag, @@ -66,7 +61,8 @@ def test_first_task_run_of_new_task(self): logical_date=old_task.start_date, run_type=DagRunType.SCHEDULED, data_interval=(old_task.start_date, old_task.start_date), - **triggered_by_kwargs, + run_after=old_task.start_date, + triggered_by=DagRunTriggeredByType.TEST, ) new_task = BaseOperator( @@ -85,7 +81,8 @@ def test_first_task_run_of_new_task(self): logical_date=logical_date, run_type=DagRunType.SCHEDULED, data_interval=(logical_date, logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) ti = dr.get_task_instance(new_task.task_id) diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py index 5b30595e4a305..c7400b1748536 100644 --- a/tests/utils/test_sqlalchemy.py +++ b/tests/utils/test_sqlalchemy.py @@ -42,12 +42,7 @@ ) from airflow.utils.state import State from airflow.utils.timezone import utcnow -from airflow.utils.types import DagRunType - -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType +from airflow.utils.types import DagRunTriggeredByType, DagRunType pytestmark = pytest.mark.db_test @@ -79,7 +74,6 @@ def test_utc_transformations(self): dag = DAG(dag_id=dag_id, schedule=datetime.timedelta(days=1), start_date=start_date) dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} run = dag.create_dagrun( run_id=iso_date, run_type=DagRunType.MANUAL, @@ -88,7 +82,8 @@ def test_utc_transformations(self): start_date=start_date, session=self.session, data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) assert logical_date == run.logical_date @@ -112,7 +107,6 @@ def test_process_bind_param_naive(self): start_date = datetime.datetime.now() dag = DAG(dag_id=dag_id, start_date=start_date, schedule=datetime.timedelta(days=1)) dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with pytest.raises((ValueError, StatementError)): dag.create_dagrun( @@ -123,7 +117,8 @@ def test_process_bind_param_naive(self): start_date=start_date, session=self.session, data_interval=dag.timetable.infer_manual_data_interval(run_after=start_date), - **triggered_by_kwargs, + run_after=start_date, + triggered_by=DagRunTriggeredByType.TEST, ) dag.clear() diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py index e99bc0bd08387..de393f38c3516 100644 --- a/tests/utils/test_state.py +++ b/tests/utils/test_state.py @@ -24,13 +24,9 @@ from airflow.models.dagrun import DagRun from airflow.utils.session import create_session from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests.models import DEFAULT_DATE -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -42,7 +38,6 @@ def test_dagrun_state_enum_escape(): """ with create_session() as session: dag = DAG(dag_id="test_dagrun_state_enum_escape", schedule=timedelta(days=1), start_date=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id=dag.timetable.generate_run_id( run_type=DagRunType.SCHEDULED, @@ -55,7 +50,8 @@ def test_dagrun_state_enum_escape(): start_date=DEFAULT_DATE, data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), session=session, - **triggered_by_kwargs, + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) query = session.query( diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index b6bb3f3751d82..d258445041d48 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -23,7 +23,7 @@ from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.www.views import DagRunModelView from providers.fab.tests.provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import ( create_user, @@ -32,16 +32,12 @@ ) from tests.www.views.test_views_tasks import _get_appbuilder_pk_string -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import ( check_content_in_response, check_content_not_in_response, client_with_login, ) -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test @@ -141,7 +137,6 @@ def test_create_dagrun_permission_denied(session, client_dr_without_dag_run_crea def running_dag_run(session): dag = DagBag().get_dag("example_bash_operator") logical_date = timezone.datetime(2016, 1, 9) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( state="running", logical_date=logical_date, @@ -149,7 +144,8 @@ def running_dag_run(session): run_id="test_dag_runs_action", run_type=DagRunType.MANUAL, session=session, - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) session.add(dr) tis = [ @@ -165,7 +161,6 @@ def running_dag_run(session): def completed_dag_run_with_missing_task(session): dag = DagBag().get_dag("example_bash_operator") logical_date = timezone.datetime(2016, 1, 9) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( state="success", logical_date=logical_date, @@ -173,7 +168,8 @@ def completed_dag_run_with_missing_task(session): run_id="test_dag_runs_action", run_type=DagRunType.MANUAL, session=session, - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) session.add(dr) tis = [ @@ -321,7 +317,6 @@ def dag_run_with_all_done_task(session): dag.sync_to_db() logical_date = timezone.datetime(2016, 1, 9) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( state="running", logical_date=logical_date, @@ -329,7 +324,8 @@ def dag_run_with_all_done_task(session): run_id="test_dagrun_failed", run_type=DagRunType.MANUAL, session=session, - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) # Create task instances in various states to test the ALL_DONE trigger rule diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py index 02be0ebd3681c..54e13641f9bb9 100644 --- a/tests/www/views/test_views_decorators.py +++ b/tests/www/views/test_views_decorators.py @@ -24,19 +24,15 @@ from airflow.models import DagBag, Variable from airflow.utils import timezone from airflow.utils.state import State -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import clear_db_runs, clear_db_variables, parse_and_sync_to_db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import ( _check_last_log, _check_last_log_masked_variable, check_content_in_response, ) -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test EXAMPLE_DAG_DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) @@ -60,7 +56,6 @@ def xcom_dag(dagbag): @pytest.fixture(autouse=True) def dagruns(bash_dag, xcom_dag): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} bash_dagrun = bash_dag.create_dagrun( run_id="test_bash", run_type=DagRunType.SCHEDULED, @@ -68,7 +63,8 @@ def dagruns(bash_dag, xcom_dag): data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE), start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, + run_after=EXAMPLE_DAG_DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) xcom_dagrun = xcom_dag.create_dagrun( @@ -78,7 +74,8 @@ def dagruns(bash_dag, xcom_dag): data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE), start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, + run_after=EXAMPLE_DAG_DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, ) yield bash_dagrun, xcom_dagrun diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py index 8ae2c63b818e0..fcf119ba207a5 100644 --- a/tests/www/views/test_views_extra_links.py +++ b/tests/www/views/test_views_extra_links.py @@ -27,7 +27,7 @@ from airflow.models.dag import DAG from airflow.utils import timezone from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.compat import BaseOperatorLink from tests_common.test_utils.db import clear_db_runs @@ -36,10 +36,6 @@ EmptyExtraLinkTestOperator, EmptyNoExtraLinkTestOperator, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -88,7 +84,6 @@ def dag(): @pytest.fixture(scope="module") def create_dag_run(dag): def _create_dag_run(*, logical_date, session): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} return dag.create_dagrun( run_id=f"manual__{logical_date.isoformat()}", state=DagRunState.RUNNING, @@ -96,7 +91,8 @@ def _create_dag_run(*, logical_date, session): data_interval=(logical_date, logical_date), run_type=DagRunType.MANUAL, session=session, - **triggered_by_kwargs, + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, ) return _create_dag_run diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index ae264d3f63829..037c97d026404 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -39,18 +39,14 @@ from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.www.app import create_app from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs from tests_common.test_utils.decorators import dont_initialize_flask_app_submodules -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import client_with_login -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test DAG_ID = "dag_for_testing_log_view" @@ -158,16 +154,16 @@ def dags(log_app, create_dummy_dag, testing_dag_bundle, session): @pytest.fixture(autouse=True) def tis(dags, session): dag, dag_removed = dags - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun = dag.create_dagrun( run_id=f"scheduled__{DEFAULT_DATE.isoformat()}", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=DEFAULT_DATE, state=DagRunState.RUNNING, session=session, - **triggered_by_kwargs, ) (ti,) = dagrun.task_instances ti.try_number = 1 @@ -177,10 +173,11 @@ def tis(dags, session): run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=DEFAULT_DATE, state=DagRunState.RUNNING, session=session, - **triggered_by_kwargs, ) (ti_removed_dag,) = dagrun_removed.task_instances ti_removed_dag.try_number = 1 diff --git a/tests/www/views/test_views_rendered.py b/tests/www/views/test_views_rendered.py index 5f4b8063d0075..7514814dad5a8 100644 --- a/tests/www/views/test_views_rendered.py +++ b/tests/www/views/test_views_rendered.py @@ -32,7 +32,7 @@ from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.compat import BashOperator, PythonOperator from tests_common.test_utils.db import ( @@ -41,12 +41,8 @@ clear_rendered_ti_fields, initial_db_init, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import check_content_in_response, check_content_not_in_response -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - DEFAULT_DATE = timezone.datetime(2020, 3, 1) pytestmark = pytest.mark.db_test @@ -145,15 +141,15 @@ def _reset_db(dag, task1, task2, task3, task4, task_secret): @pytest.fixture def create_dag_run(dag, task1, task2, task3, task4, task_secret): def _create_dag_run(*, logical_date, session): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( run_id="test", state=DagRunState.RUNNING, logical_date=logical_date, data_interval=(logical_date, logical_date), + run_after=logical_date, + triggered_by=DagRunTriggeredByType.TEST, run_type=DagRunType.SCHEDULED, session=session, - **triggered_by_kwargs, ) ti1 = dag_run.get_task_instance(task1.task_id, session=session) ti1.state = TaskInstanceState.SUCCESS @@ -342,15 +338,15 @@ def test_rendered_task_detail_env_secret(patch_app, admin_client, request, env, url = f"task?task_id=task1&dag_id=testdag&logical_date={date}" with create_session() as session: - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", state=DagRunState.RUNNING, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, run_type=DagRunType.SCHEDULED, session=session, - **triggered_by_kwargs, ) resp = admin_client.get(url, follow_redirects=True) diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 7cb0de9024cfb..31af5e5ac3063 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -39,7 +39,7 @@ from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from airflow.www.views import TaskInstanceModelView, _safe_parse_datetime from providers.fab.tests.provider_tests.fab.auth_manager.api_endpoints.api_connexion_utils import ( create_user, @@ -50,16 +50,12 @@ from tests_common.test_utils.compat import BashOperator from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_runs, clear_db_xcom -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import ( check_content_in_response, check_content_not_in_response, client_with_login, ) -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) @@ -79,24 +75,25 @@ def _reset_dagruns(): @pytest.fixture(autouse=True) def _init_dagruns(app): with time_machine.travel(DEFAULT_DATE, tick=False): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) app.dag_bag.get_dag("example_python_operator").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) XCom.set( key="return_value", @@ -110,27 +107,30 @@ def _init_dagruns(app): run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) app.dag_bag.get_dag("latest_only").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) app.dag_bag.get_dag("example_task_group").create_dagrun( run_id=DEFAULT_DAGRUN, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) yield clear_db_runs() @@ -395,15 +395,15 @@ def test_rendered_k8s_without_k8s(admin_client): def test_tree_trigger_origin_tree_view(app, admin_client): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) url = "tree?dag_id=example_bash_operator" @@ -414,15 +414,15 @@ def test_tree_trigger_origin_tree_view(app, admin_client): def test_graph_trigger_origin_grid_view(app, admin_client): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) url = "/dags/example_bash_operator/graph" @@ -433,15 +433,15 @@ def test_graph_trigger_origin_grid_view(app, admin_client): def test_gantt_trigger_origin_grid_view(app, admin_client): - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, + triggered_by=DagRunTriggeredByType.TEST, start_date=timezone.utcnow(), state=State.RUNNING, - **triggered_by_kwargs, ) url = "/dags/example_bash_operator/gantt" @@ -853,7 +853,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker): ): EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") EmptyOperator(task_id="task_3") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} run1 = dag_maker.create_dagrun( run_id="run_1", state=DagRunState.SUCCESS, @@ -861,7 +860,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker): logical_date=dag_maker.dag.start_date, start_date=dag_maker.dag.start_date, session=session, - **triggered_by_kwargs, ) run2 = dag_maker.create_dagrun( @@ -871,7 +869,6 @@ def test_task_instance_clear_downstream(session, admin_client, dag_maker): logical_date=dag_maker.dag.start_date.add(days=1), start_date=dag_maker.dag.start_date.add(days=1), session=session, - **triggered_by_kwargs, ) for run in (run1, run2): diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 904b05b946a76..66c271113a780 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -931,6 +931,8 @@ def create_dagrun(self, *, logical_date=None, **kwargs): if AIRFLOW_V_3_0_PLUS: kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST) kwargs["logical_date"] = logical_date + kwargs.setdefault("dag_version", None) + kwargs.setdefault("run_after", data_interval[-1]) else: kwargs.pop("dag_version", None) kwargs.pop("triggered_by", None) From 51bfef9f1ad67c9ab1019fb6ae5e90c223ee87a6 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Jan 2025 19:05:00 +0800 Subject: [PATCH 4/5] Pass run_after to DagRun in tests --- .../apache/kylin/operators/test_kylin_cube.py | 2 ++ .../spark/operators/test_spark_submit.py | 2 ++ .../api_endpoints/test_dag_run_endpoint.py | 24 ++++++++++++------- .../test_task_instance_endpoint.py | 2 ++ .../api_endpoints/test_xcom_endpoint.py | 4 ++++ .../redis/log/test_redis_task_handler.py | 9 ++++++- .../cncf/kubernetes/operators/test_pod.py | 23 ++++++++++++++---- tests/jobs/test_triggerer_job.py | 5 +++- tests/models/test_xcom.py | 2 ++ 9 files changed, 58 insertions(+), 15 deletions(-) diff --git a/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py b/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py index c56dbdb33b520..47e53cbc5f373 100644 --- a/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py +++ b/providers/apache/kylin/tests/provider_tests/apache/kylin/operators/test_kylin_cube.py @@ -176,6 +176,8 @@ def test_render_template(self, session): dag_id=self.dag.dag_id, run_id="kylin_test", logical_date=DEFAULT_DATE, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, run_type=DagRunType.MANUAL, state=state.DagRunState.RUNNING, ) diff --git a/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py index 2670340086bb7..94344d540681d 100644 --- a/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py +++ b/providers/apache/spark/tests/provider_tests/apache/spark/operators/test_spark_submit.py @@ -200,6 +200,8 @@ def test_render_template(self, session): dag_id=self.dag.dag_id, run_id="spark_test", logical_date=DEFAULT_DATE, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_after=DEFAULT_DATE, run_type=DagRunType.MANUAL, state="running", ) diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py index 1a297b008cf56..02057310907f3 100644 --- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py +++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py @@ -142,18 +142,26 @@ def teardown_method(self) -> None: def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commit=True, idx_start=1): dag_runs = [] dags = [] - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + + def _v3_kwargs(date): + if AIRFLOW_V_3_0_PLUS: + return { + "data_interval": (date, date), + "logical_date": date, + "run_after": date, + "triggered_by": DagRunTriggeredByType.TEST, + } + return {"execution_date": date} for i in range(idx_start, idx_start + 2): dagrun_model = DagRun( dag_id="TEST_DAG_ID", run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, - logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1), start_date=timezone.parse(self.default_time), external_trigger=True, state=state, - **triggered_by_kwargs, + **_v3_kwargs(timezone.parse(self.default_time) + timedelta(days=i - 1)), ) dagrun_model.updated_at = timezone.parse(self.default_time) dag_runs.append(dagrun_model) @@ -166,10 +174,10 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi dag_id=f"TEST_DAG_ID_{i}", run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, - logical_date=timezone.parse(self.default_time_2), start_date=timezone.parse(self.default_time), external_trigger=True, state=state, + **_v3_kwargs(timezone.parse(self.default_time_2)), ) ) if commit: @@ -203,8 +211,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "external_trigger": True, "start_date": self.default_time, "conf": {}, - "data_interval_end": None, - "data_interval_start": None, + "data_interval_end": self.default_time, + "data_interval_start": self.default_time, "last_scheduling_decision": None, "run_type": "manual", "note": None, @@ -219,8 +227,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "external_trigger": True, "start_date": self.default_time, "conf": {}, - "data_interval_end": None, - "data_interval_start": None, + "data_interval_end": self.default_time_2, + "data_interval_start": self.default_time_2, "last_scheduling_decision": None, "run_type": "manual", "note": None, diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py index 94e219ebcf0a2..9e75a2cf38368 100644 --- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py +++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py @@ -166,6 +166,8 @@ def create_task_instances( run_id=run_id, dag_id=dag_id, logical_date=logical_date, + data_interval=(logical_date, logical_date), + run_after=logical_date, run_type=DagRunType.MANUAL, state=dag_run_state, ) diff --git a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py index 6c3dd08b9e845..2d5177256b273 100644 --- a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py +++ b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py @@ -180,6 +180,8 @@ def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, mapped_ti= dag_id=dag_id, run_id=run_id, logical_date=logical_date, + data_interval=(logical_date, logical_date), + run_after=logical_date, start_date=logical_date, run_type=DagRunType.MANUAL, ) @@ -226,6 +228,7 @@ def _create_invalid_xcom_entries(self, logical_date): dag_id="invalid_dag", run_id="invalid_run_id", logical_date=logical_date + timedelta(days=1), + run_after=logical_date, start_date=logical_date, run_type=DagRunType.MANUAL, ) @@ -243,6 +246,7 @@ def _create_invalid_xcom_entries(self, logical_date): dag_id="invalid_dag", run_id="not_this_run_id", logical_date=logical_date, + run_after=logical_date, start_date=logical_date, run_type=DagRunType.MANUAL, ) diff --git a/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py b/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py index 2dde6fa402b14..32d80bf07a980 100644 --- a/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py +++ b/providers/redis/tests/provider_tests/redis/log/test_redis_task_handler.py @@ -42,7 +42,14 @@ def ti(self): dag = DAG(dag_id="dag_for_testing_redis_task_handler", schedule=None, start_date=date) task = EmptyOperator(task_id="task_for_testing_redis_log_handler", dag=dag) if AIRFLOW_V_3_0_PLUS: - dag_run = DagRun(dag_id=dag.dag_id, logical_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=dag.dag_id, + logical_date=date, + data_interval=(date, date), + run_after=date, + run_id="test", + run_type="scheduled", + ) else: dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") diff --git a/providers/tests/cncf/kubernetes/operators/test_pod.py b/providers/tests/cncf/kubernetes/operators/test_pod.py index 7e3806e8110f4..0aacc4b782fd0 100644 --- a/providers/tests/cncf/kubernetes/operators/test_pod.py +++ b/providers/tests/cncf/kubernetes/operators/test_pod.py @@ -55,6 +55,7 @@ from airflow.utils.types import DagRunType from tests_common.test_utils import db +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -96,11 +97,23 @@ def create_context(task, persist_to_db=False, map_index=None): else: dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now()) dag.add_task(task) - dag_run = DagRun( - run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), - run_type=DagRunType.MANUAL, - dag_id=dag.dag_id, - ) + now = timezone.utcnow() + if AIRFLOW_V_3_0_PLUS: + dag_run = DagRun( + run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), + run_type=DagRunType.MANUAL, + dag_id=dag.dag_id, + logical_date=now, + data_interval=(now, now), + run_after=now, + ) + else: + dag_run = DagRun( + run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE), + run_type=DagRunType.MANUAL, + dag_id=dag.dag_id, + execution_date=now, + ) task_instance = TaskInstance(task=task, run_id=dag_run.run_id) task_instance.dag_run = dag_run if map_index is not None: diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index e839b1908bcf8..c66ddce9ccd20 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -93,10 +93,13 @@ def session(): def create_trigger_in_db(session, trigger, operator=None): dag_model = DagModel(dag_id="test_dag") dag = DAG(dag_id=dag_model.dag_id, schedule="@daily", start_date=pendulum.datetime(2023, 1, 1)) + date = pendulum.datetime(2023, 1, 1) run = DagRun( dag_id=dag_model.dag_id, run_id="test_run", - logical_date=pendulum.datetime(2023, 1, 1), + logical_date=date, + data_interval=(date, date), + run_after=date, run_type=DagRunType.MANUAL, ) trigger_orm = Trigger.from_object(trigger) diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index 1e62b414de2da..36fecea2314ce 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -64,6 +64,8 @@ def func(*, dag_id, task_id, logical_date): run_type=DagRunType.SCHEDULED, run_id=run_id, logical_date=logical_date, + data_interval=(logical_date, logical_date), + run_after=logical_date, ) session.add(run) ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id) From 8e15fd0fa3a86da30ac9021fee31c5435814d6c9 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sun, 19 Jan 2025 18:46:44 +0800 Subject: [PATCH 5/5] Set default run_after to now --- airflow/models/dagrun.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f3876033dd46f..22757c972e953 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -107,6 +107,11 @@ class TISchedulingDecision(NamedTuple): finished_tis: list[TI] +def _default_run_after(ctx): + params = ctx.get_current_parameters() + return params["data_interval_end"] or params["logical_date"] or timezone.utcnow() + + def _creator_note(val): """Creator the ``note`` association proxy.""" if isinstance(val, str): @@ -146,7 +151,7 @@ class DagRun(Base, LoggingMixin): data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) # Earliest time when this DagRun can start running. - run_after = Column(UtcDateTime, nullable=False) + run_after = Column(UtcDateTime, default=_default_run_after, nullable=False) # When a scheduler last attempted to schedule TIs for this DagRun last_scheduling_decision = Column(UtcDateTime) # Foreign key to LogTemplate. DagRun rows created prior to this column's