Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
066cb891884eea1ee0496b5c507d4a52c20d0440387f9ec8bacb1d616a26e40e
4cda55eb221ee0340749c8dd41af7603220d7b300e2e808d733239a86e0c2837
428 changes: 216 additions & 212 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 6 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``29ce7909c52b`` (head) | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. |
| ``0242ac120002`` (head) | ``dfee8bd5d574`` | ``3.1.0`` | Change the Deadline column in the Deadline table from |
| | | | DateTime to UTC DateTime. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``dfee8bd5d574`` | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``29ce7909c52b`` | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``959e216a3abb`` | ``0e9519b56710`` | ``3.0.0`` | Rename ``is_active`` to ``is_stale`` column in ``dag`` |
| | | | table. |
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ def update_dags(
"core", "max_consecutive_failed_dag_runs_per_dag"
)

if dag.deadline is not None:
dm.deadline = dag.deadline

if hasattr(dag, "has_task_concurrency_limits"):
dm.has_task_concurrency_limits = dag.has_task_concurrency_limits
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add Deadline to Dag.

Revision ID: dfee8bd5d574
Revises: 29ce7909c52b
Create Date: 2024-12-18 19:10:26.962464
"""

from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op

from airflow.settings import json

revision = "dfee8bd5d574"
down_revision = "29ce7909c52b"
branch_labels = None
depends_on = None
airflow_version = "3.1.0"


def upgrade():
op.add_column(
"dag",
sa.Column("deadline", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
)


def downgrade():
op.drop_column("dag", "deadline")
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change the Deadline column in the Deadline table from DateTime to UTC DateTime.

Revision ID: 0242ac120002
Revises: dfee8bd5d574
Create Date: 2024-12-18 19:10:26.962464
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

revision = "0242ac120002"
down_revision = "dfee8bd5d574"
branch_labels = None
depends_on = None
airflow_version = "3.1.0"


def upgrade():
"""Apply change deadline column in the deadline table to UTC datetime."""
with op.batch_alter_table("deadline", schema=None) as batch_op:
batch_op.alter_column(
"deadline",
existing_type=sa.DateTime(),
type_=TIMESTAMP(timezone=True),
existing_nullable=False,
)


def downgrade():
"""Unapply change deadline column in the deadline table to UTC datetime."""
with op.batch_alter_table("deadline", schema=None) as batch_op:
batch_op.alter_column(
"deadline",
existing_type=TIMESTAMP(timezone=True),
type_=sa.DateTime(),
existing_nullable=False,
)
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def import_all_models():
import airflow.models.dag_version
import airflow.models.dagbundle
import airflow.models.dagwarning
import airflow.models.deadline
import airflow.models.errors
import airflow.models.serialized_dag
import airflow.models.taskinstancehistory
Expand Down Expand Up @@ -101,6 +100,7 @@ def __getattr__(name):
"DagTag": "airflow.models.dag",
"DagWarning": "airflow.models.dagwarning",
"DbCallbackRequest": "airflow.models.db_callback_request",
"Deadline": "airflow.models.deadline",
"Log": "airflow.models.log",
"MappedOperator": "airflow.models.mappedoperator",
"Operator": "airflow.models.operator",
Expand Down
17 changes: 16 additions & 1 deletion airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,19 @@ class DAG(TaskSDKDag, LoggingMixin):
:param dagrun_timeout: Specify the duration a DagRun should be allowed to run before it times out or
fails. Task instances that are running when a DagRun is timed out will be marked as skipped.
:param sla_miss_callback: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1
:param deadline: Optional Deadline Alert for the DAG.
Specifies a time by which the DAG run should be complete, either in the form of a static datetime
or calculated relative to a reference timestamp. If the deadline passes before completion, the
provided callback is triggered.

**Example**: To set the deadline for one hour after the DAG run starts you could use ::

DeadlineAlert(
reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
interval=timedelta(hours=1),
callback=my_callback,
)

:param catchup: Perform scheduler catchup (or only run latest)? Defaults to False
:param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails.
A context dictionary is passed as a single parameter to this function.
Expand Down Expand Up @@ -1868,7 +1881,7 @@ class DagModel(Base):

__tablename__ = "dag"
"""
These items are stored in the database for state related information
These items are stored in the database for state related information.
"""
dag_id = Column(StringID(), primary_key=True)
# A DAG can be paused from the UI / DB
Expand Down Expand Up @@ -1903,6 +1916,8 @@ class DagModel(Base):
timetable_description = Column(String(1000), nullable=True)
# Asset expression based on asset triggers
asset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
# DAG deadline information
deadline = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
# Tags for view filter
tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
# Dag owner links for DAGs view
Expand Down
105 changes: 99 additions & 6 deletions airflow-core/src/airflow/models/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING
import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, Callable

import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, String
from sqlalchemy import Column, ForeignKey, Index, Integer, String
from sqlalchemy_utils import UUIDType

from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, is_valid_dotpath
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from sqlalchemy.orm import Session

logger = logging.getLogger(__name__)

class Deadline(Base, LoggingMixin):

class Deadline(Base):
"""A Deadline is a 'need-by' date which triggers a callback if the provided time has passed."""

__tablename__ = "deadline"
Expand All @@ -45,7 +50,7 @@ class Deadline(Base, LoggingMixin):
dagrun_id = Column(Integer, ForeignKey("dag_run.id", ondelete="CASCADE"))

# The time after which the Deadline has passed and the callback should be triggered.
deadline = Column(DateTime, nullable=False)
deadline = Column(UtcDateTime, nullable=False)
# The Callback to be called when the Deadline has passed.
callback = Column(String(500), nullable=False)
# Serialized kwargs to pass to the callback.
Expand Down Expand Up @@ -90,3 +95,91 @@ def _determine_resource() -> tuple[str, str]:
def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION):
"""Add the provided deadline to the table."""
session.add(deadline)


class DeadlineReference(Enum):
"""
Store the calculation methods for the various Deadline Alert triggers.

TODO: PLEASE NOTE This class is a placeholder and will be expanded in the next PR.

------
Usage:
------

Example use when defining a deadline in a DAG:

DAG(
dag_id='dag_with_deadline',
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
interval=timedelta(hours=1),
callback=hello_callback,
)
)

To parse the deadline reference later we will use something like:

dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
"""

DAGRUN_LOGICAL_DATE = "dagrun_logical_date"


class DeadlineAlert:
"""Store Deadline values needed to calculate the need-by timestamp and the callback information."""

def __init__(
self,
reference: DeadlineReference,
interval: timedelta,
callback: Callable | str,
callback_kwargs: dict | None = None,
):
self.reference = reference
self.interval = interval
self.callback_kwargs = callback_kwargs
self.callback = self.get_callback_path(callback)

@staticmethod
def get_callback_path(_callback: str | Callable) -> str:
if callable(_callback):
# Get the reference path to the callable in the form `airflow.models.deadline.get_from_db`
return f"{_callback.__module__}.{_callback.__qualname__}"

if not isinstance(_callback, str) or not is_valid_dotpath(_callback.strip()):
raise ImportError(f"`{_callback}` doesn't look like a valid dot path.")

stripped_callback = _callback.strip()

try:
# The provided callback is a string which appears to be a valid dotpath, attempt to import it.
callback = import_string(stripped_callback)
except ImportError as e:
# Logging here instead of failing because it is possible that the code for the callable
# exists somewhere other than on the DAG processor. We are making a best effort to validate,
# but can't rule out that it may be available at runtime even if it can not be imported here.
logger.debug(
"Callback %s is formatted like a callable dotpath, but could not be imported.\n%s",
stripped_callback,
e,
)
return stripped_callback

# If we get this far then the input is a string which can be imported, check if it is a callable.
if not callable(callback):
raise AttributeError(f"Provided callback {callback} is not callable.")

return stripped_callback

def serialize_deadline_alert(self):
from airflow.serialization.serialized_objects import BaseSerialization

return BaseSerialization.serialize(
{
"reference": self.reference,
"interval": self.interval,
"callback": self.callback,
"callback_kwargs": self.callback_kwargs,
}
)
1 change: 1 addition & 0 deletions airflow-core/src/airflow/serialization/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ class DagAttributeTypes(str, Enum):
DAG_CALLBACK_REQUEST = "dag_callback_request"
TASK_INSTANCE_KEY = "task_instance_key"
TRIGGER = "trigger"
DEADLINE_ALERT = "deadline_alert"
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@
},
"dag_display_name": { "type" : "string"},
"description": { "type" : "string"},
"deadline": {
"anyOf": [
{ "$ref": "#/definitions/dict" },
{ "type": "null" }
]
},
"_concurrency": { "type" : "number"},
"max_active_tasks": { "type" : "number"},
"max_active_runs": { "type" : "number"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
from airflow.models.dag import DAG, _get_model_data_interval
from airflow.models.deadline import DeadlineAlert
from airflow.models.expandinput import (
create_expand_input,
)
Expand Down Expand Up @@ -737,6 +738,8 @@ def serialize(
)
elif isinstance(var, DAG):
return cls._encode(SerializedDAG.serialize_dag(var), type_=DAT.DAG)
elif isinstance(var, DeadlineAlert):
return cls._encode(DeadlineAlert.serialize_deadline_alert(var), type_=DAT.DEADLINE_ALERT)
elif isinstance(var, Resources):
return var.to_dict()
elif isinstance(var, MappedOperator):
Expand Down Expand Up @@ -1692,6 +1695,8 @@ def serialize_dag(cls, dag: DAG) -> dict:
serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)]
serialized_dag["task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group)

serialized_dag["deadline"] = dag.deadline.serialize_deadline_alert() if dag.deadline else None

# Edge info in the JSON exactly matches our internal structure
serialized_dag["edge_info"] = dag.edge_info
serialized_dag["params"] = cls._serialize_params_dict(dag.params)
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class MappedClassProtocol(Protocol):
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "29ce7909c52b",
"3.1.0": "0242ac120002",
}


Expand Down
Loading