Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def _trigger_dag(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def trigger_dag_run(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class DagRun(StrictBaseModel):
logical_date: UtcDateTime
data_interval_start: UtcDateTime | None
data_interval_end: UtcDateTime | None
run_after: UtcDateTime
start_date: UtcDateTime
end_date: UtcDateTime | None
run_type: DagRunType
Expand Down
1 change: 1 addition & 0 deletions airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def ti_run(
DR.dag_id,
DR.data_interval_start,
DR.data_interval_end,
DR.run_after,
DR.start_date,
DR.end_date,
DR.run_type,
Expand Down
8 changes: 6 additions & 2 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,26 @@ def _get_dag_run(
dag_run_logical_date = pendulum.instance(timezone.utcnow())

if create_if_necessary == "memory":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = DagRun(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
data_interval=data_interval,
run_after=data_interval.end,
triggered_by=DagRunTriggeredByType.CLI,
state=DagRunState.RUNNING,
)
return dag_run, True
elif create_if_necessary == "db":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = dag.create_dagrun(
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
data_interval=data_interval,
run_after=data_interval.end,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
Expand Down
2 changes: 2 additions & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
),
logical_date=dag_model.next_dagrun,
data_interval=data_interval,
run_after=dag_model.next_dagrun_create_after,
run_type=DagRunType.SCHEDULED,
triggered_by=DagRunTriggeredByType.TIMETABLE,
dag_version=latest_dag_version,
Expand Down Expand Up @@ -1395,6 +1396,7 @@ def _create_dag_runs_asset_triggered(
),
logical_date=logical_date,
data_interval=data_interval,
run_after=max(logical_dates.values()),
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
dag_version=latest_dag_version,
Expand Down
62 changes: 62 additions & 0 deletions airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add DagRun run_after.

Revision ID: 6a9e7a527a88
Revises: 33b04e4bfa19
Create Date: 2025-01-16 10:07:34.940948
"""

from __future__ import annotations

import alembic.op as op
import sqlalchemy as sa

from airflow.utils.sqlalchemy import UtcDateTime

# Revision identifiers, used by Alembic.
revision = "6a9e7a527a88"
down_revision = "33b04e4bfa19"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Add DagRun run_after."""
op.add_column("dag_run", sa.Column("run_after", UtcDateTime, nullable=True))

# run_after must be set to after the data interval ends.
# In very old runs where the data interval is not available, we'll just use
# the logical date. This is wrong (one interval too early), but good enough
# since those super old runs should have their data interval ended a long
# time ago anyway, and setting
op.execute("update dag_run set run_after = coalesce(data_interval_end, logical_date)")

with op.batch_alter_table("dag_run") as batch_op:
batch_op.alter_column("run_after", existing_type=UtcDateTime, nullable=False)
batch_op.create_index("idx_dag_run_run_after", ["run_after"], unique=False)


def downgrade():
"""Remove DagRun run_after."""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_index("idx_dag_run_run_after")
batch_op.drop_column("run_after")
1 change: 1 addition & 0 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def _create_backfill_dag_run(
),
logical_date=info.logical_date,
data_interval=info.data_interval,
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
Expand Down
1 change: 1 addition & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ def run(
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
data_interval=info.data_interval,
run_after=info.run_after,
triggered_by=DagRunTriggeredByType.TEST,
state=DagRunState.RUNNING,
)
Expand Down
11 changes: 9 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def _create_orm_dagrun(
run_id: str,
logical_date: datetime | None,
data_interval: DataInterval | None,
run_after: datetime,
start_date: datetime | None,
external_trigger: bool,
conf: Any,
Expand All @@ -266,6 +267,7 @@ def _create_orm_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=start_date,
run_after=run_after,
external_trigger=external_trigger,
conf=conf,
state=state,
Expand Down Expand Up @@ -1634,11 +1636,12 @@ def add_logger_if_needed(ti: TaskInstance):
dag=scheduler_dag,
start_date=logical_date,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
session=session,
conf=run_conf,
triggered_by=DagRunTriggeredByType.TEST,
data_interval=data_interval,
)

tasks = self.task_dict
Expand Down Expand Up @@ -1721,6 +1724,7 @@ def create_dagrun(
run_id: str,
logical_date: datetime,
data_interval: tuple[datetime, datetime],
run_after: datetime,
conf: dict | None = None,
run_type: DagRunType,
triggered_by: DagRunTriggeredByType,
Expand Down Expand Up @@ -1791,6 +1795,8 @@ def create_dagrun(
dag=self,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=timezone.coerce_datetime(run_after),
start_date=timezone.coerce_datetime(start_date),
external_trigger=external_trigger,
conf=conf,
Expand All @@ -1799,7 +1805,6 @@ def create_dagrun(
dag_version=dag_version,
creating_job_id=creating_job_id,
backfill_id=backfill_id,
data_interval=data_interval,
triggered_by=triggered_by,
session=session,
)
Expand Down Expand Up @@ -2439,6 +2444,7 @@ def _get_or_create_dagrun(
run_id: str,
logical_date: datetime,
data_interval: tuple[datetime, datetime],
run_after: datetime,
conf: dict | None,
triggered_by: DagRunTriggeredByType,
start_date: datetime,
Expand Down Expand Up @@ -2468,6 +2474,7 @@ def _get_or_create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=run_after,
conf=conf,
run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
Expand Down
11 changes: 11 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class TISchedulingDecision(NamedTuple):
finished_tis: list[TI]


def _default_run_after(ctx):
params = ctx.get_current_parameters()
return params["data_interval_end"] or params["logical_date"] or timezone.utcnow()


def _creator_note(val):
"""Creator the ``note`` association proxy."""
if isinstance(val, str):
Expand Down Expand Up @@ -145,6 +150,8 @@ class DagRun(Base, LoggingMixin):
# These two must be either both NULL or both datetime.
data_interval_start = Column(UtcDateTime)
data_interval_end = Column(UtcDateTime)
# Earliest time when this DagRun can start running.
run_after = Column(UtcDateTime, default=_default_run_after, nullable=False)
# When a scheduler last attempted to schedule TIs for this DagRun
last_scheduling_decision = Column(UtcDateTime)
# Foreign key to LogTemplate. DagRun rows created prior to this column's
Expand Down Expand Up @@ -180,6 +187,7 @@ class DagRun(Base, LoggingMixin):
Index("dag_id_state", dag_id, _state),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
Index("idx_dag_run_dag_id", dag_id),
Index("idx_dag_run_run_after", run_after),
Index(
"idx_dag_run_running_dags",
"state",
Expand Down Expand Up @@ -229,8 +237,10 @@ def __init__(
self,
dag_id: str | None = None,
run_id: str | None = None,
*,
queued_at: datetime | None | ArgNotSet = NOTSET,
logical_date: datetime | None = None,
run_after: datetime | None = None,
start_date: datetime | None = None,
external_trigger: bool | None = None,
conf: Any | None = None,
Expand All @@ -253,6 +263,7 @@ def __init__(
self.dag_id = dag_id
self.run_id = run_id
self.logical_date = logical_date
self.run_after = run_after
self.start_date = start_date
self.external_trigger = external_trigger
self.conf = conf or {}
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "33b04e4bfa19",
"3.0.0": "6a9e7a527a88",
}


Expand Down
1 change: 1 addition & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.UI,
Expand Down
1 change: 1 addition & 0 deletions dev/perf/scheduler_dag_execution_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def create_dag_runs(dag, num_runs, session):
run_id=f"{id_prefix}{logical_date.isoformat()}",
logical_date=logical_date,
data_interval=(logical_date, logical_date),
run_after=logical_date,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.TEST,
external_trigger=False,
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0f041321ecbb5d059f7a9c85fd5cfba745732eea205dc2e4f5e3b6dd76a9468d
232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412
Loading
Loading