From caecb02e78d44560166812321a8f75b4ebc385e2 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Fri, 10 Jan 2025 14:55:31 -0800 Subject: [PATCH 1/9] Add deadline to DAG model Example usage: ``` @task def hello(): log.info('hello world') with DAG( dag_id='dag_with_deadline', deadline=DeadlineAlert( trigger=DeadlineTrigger.DAGRUN_LOGICAL_DATE, interval=timedelta(hours=1), callback=hello_callback, ) ): hello() ``` --- airflow-core/docs/migrations-ref.rst | 4 +- .../src/airflow/dag_processing/collection.py | 3 + .../0069_3_1_0_add_deadline_to_dag.py | 50 ++++++++ airflow-core/src/airflow/models/__init__.py | 2 +- airflow-core/src/airflow/models/dag.py | 5 +- airflow-core/src/airflow/models/deadline.py | 88 +++++++++++++- .../src/airflow/serialization/enums.py | 1 + .../src/airflow/serialization/schema.json | 6 + .../serialization/serialized_objects.py | 5 + airflow-core/src/airflow/utils/db.py | 1 + .../tests/unit/models/test_deadline.py | 109 +++++++++++------- .../serialization/test_dag_serialization.py | 2 + task-sdk/src/airflow/sdk/definitions/dag.py | 20 ++-- 13 files changed, 238 insertions(+), 58 deletions(-) create mode 100644 airflow-core/src/airflow/migrations/versions/0069_3_1_0_add_deadline_to_dag.py diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 2f35cf2deecf4..2fb36af0e7518 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``29ce7909c52b`` (head) | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. | +| ``dfee8bd5d574`` (head) | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``29ce7909c52b`` | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``959e216a3abb`` | ``0e9519b56710`` | ``3.0.0`` | Rename ``is_active`` to ``is_stale`` column in ``dag`` | | | | | table. | diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index bfc2cb20662dd..9cf13b5269319 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -466,6 +466,9 @@ def update_dags( "core", "max_consecutive_failed_dag_runs_per_dag" ) + if dag.deadline is not None: + dm.deadline = dag.deadline + if hasattr(dag, "has_task_concurrency_limits"): dm.has_task_concurrency_limits = dag.has_task_concurrency_limits else: diff --git a/airflow-core/src/airflow/migrations/versions/0069_3_1_0_add_deadline_to_dag.py b/airflow-core/src/airflow/migrations/versions/0069_3_1_0_add_deadline_to_dag.py new file mode 100644 index 0000000000000..c82bc33cfb20e --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0069_3_1_0_add_deadline_to_dag.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add Deadline to Dag. + +Revision ID: dfee8bd5d574 +Revises: 29ce7909c52b +Create Date: 2024-12-18 19:10:26.962464 +""" + +from __future__ import annotations + +import sqlalchemy as sa +import sqlalchemy_jsonfield +from alembic import op + +from airflow.settings import json + +revision = "dfee8bd5d574" +down_revision = "29ce7909c52b" +branch_labels = None +depends_on = None +airflow_version = "3.1.0" + + +def upgrade(): + op.add_column( + "dag", + sa.Column("deadline", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + ) + + +def downgrade(): + op.drop_column("dag", "deadline") diff --git a/airflow-core/src/airflow/models/__init__.py b/airflow-core/src/airflow/models/__init__.py index 2f5c426c3ac39..be9541484d059 100644 --- a/airflow-core/src/airflow/models/__init__.py +++ b/airflow-core/src/airflow/models/__init__.py @@ -63,7 +63,6 @@ def import_all_models(): import airflow.models.dag_version import airflow.models.dagbundle import airflow.models.dagwarning - import airflow.models.deadline import airflow.models.errors import airflow.models.serialized_dag import airflow.models.taskinstancehistory @@ -101,6 +100,7 @@ def __getattr__(name): "DagTag": "airflow.models.dag", "DagWarning": "airflow.models.dagwarning", "DbCallbackRequest": "airflow.models.db_callback_request", + "Deadline": "airflow.models.deadline", "Log": "airflow.models.log", "MappedOperator": "airflow.models.mappedoperator", "Operator": "airflow.models.operator", diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 57c1050e3ce94..7ffb0d9874e6d 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -373,6 +373,7 @@ class DAG(TaskSDKDag, LoggingMixin): :param dagrun_timeout: Specify the duration a DagRun should be allowed to run before it times out or fails. Task instances that are running when a DagRun is timed out will be marked as skipped. :param sla_miss_callback: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1 + :param deadline: Optional Deadline Alert for the DAG. :param catchup: Perform scheduler catchup (or only run latest)? Defaults to False :param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function. @@ -1868,7 +1869,7 @@ class DagModel(Base): __tablename__ = "dag" """ - These items are stored in the database for state related information + These items are stored in the database for state related information. """ dag_id = Column(StringID(), primary_key=True) # A DAG can be paused from the UI / DB @@ -1903,6 +1904,8 @@ class DagModel(Base): timetable_description = Column(String(1000), nullable=True) # Asset expression based on asset triggers asset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + # DAG deadline information + deadline = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) # Tags for view filter tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag")) # Dag owner links for DAGs view diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 0c27cbcfc196e..1965fbacac1f3 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -16,22 +16,28 @@ # under the License. from __future__ import annotations -from datetime import datetime -from typing import TYPE_CHECKING +import logging +import sys +from datetime import datetime, timedelta +from enum import Enum +from typing import TYPE_CHECKING, Callable import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, String +from sqlalchemy import Column, ForeignKey, Index, Integer, String from sqlalchemy_utils import UUIDType from airflow.models.base import Base, StringID from airflow.settings import json from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.sqlalchemy import UtcDateTime if TYPE_CHECKING: from sqlalchemy.orm import Session +log = logging.getLogger(__name__) + class Deadline(Base, LoggingMixin): """A Deadline is a 'need-by' date which triggers a callback if the provided time has passed.""" @@ -45,7 +51,7 @@ class Deadline(Base, LoggingMixin): dagrun_id = Column(Integer, ForeignKey("dag_run.id", ondelete="CASCADE")) # The time after which the Deadline has passed and the callback should be triggered. - deadline = Column(DateTime, nullable=False) + deadline = Column(UtcDateTime, nullable=False) # The Callback to be called when the Deadline has passed. callback = Column(String(500), nullable=False) # Serialized kwargs to pass to the callback. @@ -90,3 +96,77 @@ def _determine_resource() -> tuple[str, str]: def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): """Add the provided deadline to the table.""" session.add(deadline) + + +class DeadlineTrigger(Enum): + """ + Store the calculation methods for the various Deadline Alert triggers. + + TODO: PLEASE NOTE This class is a placeholder and will be expanded in the next PR. + + ------ + Usage: + ------ + + Example use when defining a deadline in a DAG: + + DAG( + dag_id='dag_with_deadline', + deadline=DeadlineAlert( + trigger=DeadlineTrigger.DAGRUN_LOGICAL_DATE, + interval=timedelta(hours=1), + callback=hello_callback, + ) + ) + + To parse the deadline trigger later we will use something like: + + dag.deadline.trigger.evaluate_with(dag_id=dag.dag_id) + """ + + DAGRUN_LOGICAL_DATE = "dagrun_logical_date" + + +class DeadlineAlert(LoggingMixin): + """Store Deadline values needed to calculate the need-by timestamp and the callback information.""" + + def __init__( + self, + trigger: DeadlineTrigger, + interval: timedelta, + callback: Callable | str, + callback_kwargs: dict | None = None, + ): + super().__init__() + self.trigger = trigger + self.interval = interval + self.callback_kwargs = callback_kwargs + self.callback = self.get_callback_path(callback) + + @staticmethod + def get_callback_path(_callback: str | Callable) -> str: + if callable(_callback): + # Get the reference path to the callable in the form `airflow.models.deadline.get_from_db` + return f"{_callback.__module__}.{_callback.__qualname__}" + + # Check if the dotpath can resolve to a callable; store it or raise a ValueError + try: + _callback_module, _callback_name = _callback.rsplit(".", 1) + getattr(sys.modules[_callback_module], _callback_name) + return _callback + except (KeyError, AttributeError): + # KeyError if the path is not valid + # AttributeError if the provided value can't be rsplit + raise ValueError("callback is not a path to a callable") + + def serialize_deadline_alert(self): + from airflow.serialization.serialized_objects import BaseSerialization + + return BaseSerialization.serialize( + { + "trigger": self.trigger, + "interval": self.interval, + "callback": self.callback, + "callback_kwargs": self.callback_kwargs, + } + ) diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index ab7d59cb21682..1d04bee6f290d 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -75,3 +75,4 @@ class DagAttributeTypes(str, Enum): DAG_CALLBACK_REQUEST = "dag_callback_request" TASK_INSTANCE_KEY = "task_instance_key" TRIGGER = "trigger" + DEADLINE_ALERT = "deadline_alert" diff --git a/airflow-core/src/airflow/serialization/schema.json b/airflow-core/src/airflow/serialization/schema.json index 0670acd588cbe..aa13f618f844d 100644 --- a/airflow-core/src/airflow/serialization/schema.json +++ b/airflow-core/src/airflow/serialization/schema.json @@ -187,6 +187,12 @@ }, "dag_display_name": { "type" : "string"}, "description": { "type" : "string"}, + "deadline": { + "anyOf": [ + { "$ref": "#/definitions/dict" }, + { "type": "null" } + ] + }, "_concurrency": { "type" : "number"}, "max_active_tasks": { "type" : "number"}, "max_active_runs": { "type" : "number"}, diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 83a6dbf5cf7ef..9abd509452bed 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -44,6 +44,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval +from airflow.models.deadline import DeadlineAlert from airflow.models.expandinput import ( create_expand_input, ) @@ -737,6 +738,8 @@ def serialize( ) elif isinstance(var, DAG): return cls._encode(SerializedDAG.serialize_dag(var), type_=DAT.DAG) + elif isinstance(var, DeadlineAlert): + return cls._encode(DeadlineAlert.serialize_deadline_alert(var), type_=DAT.DEADLINE_ALERT) elif isinstance(var, Resources): return var.to_dict() elif isinstance(var, MappedOperator): @@ -1692,6 +1695,8 @@ def serialize_dag(cls, dag: DAG) -> dict: serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)] serialized_dag["task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group) + serialized_dag["deadline"] = dag.deadline.serialize_deadline_alert() if dag.deadline else None + # Edge info in the JSON exactly matches our internal structure serialized_dag["edge_info"] = dag.edge_info serialized_dag["params"] = cls._serialize_params_dict(dag.params) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index c1ecef8f52f26..f774938629f97 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -93,6 +93,7 @@ class MappedClassProtocol(Protocol): "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", "3.0.0": "29ce7909c52b", + "3.1.0": "dfee8bd5d574", } diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 85e8c8e81b54f..b38d7ca22a129 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -17,58 +17,69 @@ from __future__ import annotations import json -from datetime import datetime +from typing import Callable import pytest +import time_machine from sqlalchemy import select from airflow.models import DagRun -from airflow.models.deadline import Deadline +from airflow.models.deadline import Deadline, DeadlineAlert from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.utils.state import DagRunState from tests_common.test_utils import db +from unit.models import DEFAULT_DATE DAG_ID = "dag_id_1" RUN_ID = 1 +TEST_CALLBACK_KWARGS = {"to": "the_boss@work.com"} +TEST_CALLBACK_PATH = f"{__name__}.test_callback" -def my_callback(): + +def test_callback(): """An empty Callable to use for the callback tests in this suite.""" pass -@pytest.mark.db_test -class TestDeadline: - def setup_method(self): - self._clean_db() - - def teardown_method(self): - self._clean_db() +def _clean_db(): + db.clear_db_dags() + db.clear_db_runs() + db.clear_db_deadline() - @staticmethod - def _clean_db(): - db.clear_db_dags() - db.clear_db_runs() - db.clear_db_deadline() - @pytest.fixture - def create_dagrun(self, dag_maker, session): - with dag_maker(DAG_ID): - EmptyOperator(task_id="TASK_ID") - dag_maker.create_dagrun() +@pytest.fixture +def dagrun(session, dag_maker): + with dag_maker(DAG_ID): + EmptyOperator(task_id="TASK_ID") + with time_machine.travel(DEFAULT_DATE): + dag_maker.create_dagrun(state=DagRunState.QUEUED, logical_date=DEFAULT_DATE) session.commit() assert session.query(DagRun).count() == 1 - return session.query(DagRun).one().id - def test_add_deadline(self, create_dagrun, session): + return session.query(DagRun).one() + + +@pytest.mark.db_test +class TestDeadline: + @staticmethod + def setup_method(): + _clean_db() + + @staticmethod + def teardown_method(): + _clean_db() + + def test_add_deadline(self, dagrun, session): assert session.query(Deadline).count() == 0 deadline_orm = Deadline( - deadline=datetime(2024, 12, 4, 16, 00, 0), - callback=my_callback.__module__, - callback_kwargs={"to": "the_boss@work.com"}, + deadline=DEFAULT_DATE, + callback=TEST_CALLBACK_PATH, + callback_kwargs=TEST_CALLBACK_KWARGS, dag_id=DAG_ID, - dagrun_id=create_dagrun, + dagrun_id=dagrun.id, ) Deadline.add_deadline(deadline_orm) @@ -84,24 +95,24 @@ def test_add_deadline(self, create_dagrun, session): def test_orm(self): deadline_orm = Deadline( - deadline=datetime(2024, 12, 4, 16, 00, 0), - callback=my_callback.__module__, - callback_kwargs={"to": "the_boss@work.com"}, + deadline=DEFAULT_DATE, + callback=TEST_CALLBACK_PATH, + callback_kwargs=TEST_CALLBACK_KWARGS, dag_id=DAG_ID, dagrun_id=RUN_ID, ) - assert deadline_orm.deadline == datetime(2024, 12, 4, 16, 00, 0) - assert deadline_orm.callback == my_callback.__module__ - assert deadline_orm.callback_kwargs == {"to": "the_boss@work.com"} + assert deadline_orm.deadline == DEFAULT_DATE + assert deadline_orm.callback == TEST_CALLBACK_PATH + assert deadline_orm.callback_kwargs == TEST_CALLBACK_KWARGS assert deadline_orm.dag_id == DAG_ID assert deadline_orm.dagrun_id == RUN_ID def test_repr_with_callback_kwargs(self): deadline_orm = Deadline( - deadline=datetime(2024, 12, 4, 16, 00, 0), - callback=my_callback.__module__, - callback_kwargs={"to": "the_boss@work.com"}, + deadline=DEFAULT_DATE, + callback=TEST_CALLBACK_PATH, + callback_kwargs=TEST_CALLBACK_KWARGS, dag_id=DAG_ID, dagrun_id=RUN_ID, ) @@ -109,13 +120,13 @@ def test_repr_with_callback_kwargs(self): assert ( repr(deadline_orm) == f"[DagRun Deadline] Dag: {deadline_orm.dag_id} Run: {deadline_orm.dagrun_id} needed by " - f"{deadline_orm.deadline} or run: {my_callback.__module__}({json.dumps(deadline_orm.callback_kwargs)})" + f"{deadline_orm.deadline} or run: {TEST_CALLBACK_PATH}({json.dumps(deadline_orm.callback_kwargs)})" ) def test_repr_without_callback_kwargs(self): deadline_orm = Deadline( - deadline=datetime(2024, 12, 4, 16, 00, 0), - callback=my_callback.__module__, + deadline=DEFAULT_DATE, + callback=TEST_CALLBACK_PATH, dag_id=DAG_ID, dagrun_id=RUN_ID, ) @@ -124,5 +135,25 @@ def test_repr_without_callback_kwargs(self): assert ( repr(deadline_orm) == f"[DagRun Deadline] Dag: {deadline_orm.dag_id} Run: {deadline_orm.dagrun_id} needed by " - f"{deadline_orm.deadline} or run: {my_callback.__module__}()" + f"{deadline_orm.deadline} or run: {TEST_CALLBACK_PATH}()" ) + + +class TestDeadlineAlert: + @pytest.mark.parametrize( + "callback_value, expect_success", + [ + pytest.param(test_callback, True, id="valid_callable"), + pytest.param(TEST_CALLBACK_PATH, True, id="valid_path"), + pytest.param("bad.path.to.some.callback", False, id="invalid_path"), + pytest.param(42, False, id="not_even_a_path"), + ], + ) + def test_get_callback_path(self, callback_value: Callable | str, expect_success: bool): + if expect_success: + path = DeadlineAlert.get_callback_path(callback_value) + + assert path == TEST_CALLBACK_PATH + else: + with pytest.raises(ValueError, match="callback is not a path to a callable"): + DeadlineAlert.get_callback_path(callback_value) diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 6600e8e7edd6f..1a3b55f0a892e 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -160,6 +160,7 @@ }, "is_paused_upon_creation": False, "dag_id": "simple_dag", + "deadline": None, "catchup": False, "disable_bundle_versioning": False, "doc_md": "### DAG Tutorial Documentation", @@ -3137,6 +3138,7 @@ def test_handle_v1_serdag(): }, "is_paused_upon_creation": False, "_dag_id": "simple_dag", + "deadline": None, "doc_md": "### DAG Tutorial Documentation", "fileloc": None, "_processor_dags_folder": ( diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index e938be3c33619..9706994ed52cb 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -23,7 +23,6 @@ import logging import os import sys -import warnings import weakref from collections import abc from collections.abc import Collection, Iterable, MutableSet @@ -51,6 +50,7 @@ ParamValidationError, TaskNotFound, ) +from airflow.models.deadline import DeadlineAlert from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.sdk.definitions._internal.node import validate_key @@ -107,7 +107,7 @@ "template_searchpath", "last_loaded", "schedule", - # TODO: Task-SDK: we should be hashing on timetable now, not scheulde! + # TODO: Task-SDK: we should be hashing on timetable now, not schedule! # "timetable", } ) @@ -406,7 +406,11 @@ def __rich_repr__(self): default=None, validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)), ) - sla_miss_callback: None = attrs.field(default=None) + deadline: DeadlineAlert | None = attrs.field( + default=None, + validator=attrs.validators.optional(attrs.validators.instance_of(DeadlineAlert)), + ) + catchup: bool = attrs.field( factory=_config_bool_factory("scheduler", "catchup_by_default"), ) @@ -547,15 +551,6 @@ def _has_on_success_callback(self) -> bool: def _has_on_failure_callback(self) -> bool: return self.on_failure_callback is not None - @sla_miss_callback.validator - def _validate_sla_miss_callback(self, _, value): - if value is not None: - warnings.warn( - "The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in >=3.1", - stacklevel=2, - ) - return value - def __repr__(self): return f"" @@ -1338,6 +1333,7 @@ def dag( catchup: bool = ..., on_success_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None, + deadline: DeadlineAlert | None = None, doc_md: str | None = None, params: ParamsDict | dict[str, Any] | None = None, access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None, From eb97e7bd337c0a78f046efb097752d315cc7a690 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Thu, 1 May 2025 13:17:22 -0700 Subject: [PATCH 2/9] Add db migration for converting the `deadline` column, which will store a calculated datetime/timestamp, from DateTime to DateTimeUTC. --- airflow-core/docs/migrations-ref.rst | 5 +- .../0070_3_1_0_change_deadline_to_utc.py | 60 +++++++++++++++++++ airflow-core/src/airflow/utils/db.py | 2 +- 3 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 airflow-core/src/airflow/migrations/versions/0070_3_1_0_change_deadline_to_utc.py diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 2fb36af0e7518..1c510e9a60b3d 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``dfee8bd5d574`` (head) | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. | +| ``0242ac120002`` (head) | ``dfee8bd5d574`` | ``3.1.0`` | Change Deadline in the Dag table from DateTime to UTC | +| | | | DateTime. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``dfee8bd5d574`` | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``29ce7909c52b`` | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/migrations/versions/0070_3_1_0_change_deadline_to_utc.py b/airflow-core/src/airflow/migrations/versions/0070_3_1_0_change_deadline_to_utc.py new file mode 100644 index 0000000000000..b8cce104d2988 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0070_3_1_0_change_deadline_to_utc.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Change the Deadline column in the Deadline table from DateTime to UTC DateTime. + +Revision ID: 0242ac120002 +Revises: dfee8bd5d574 +Create Date: 2024-12-18 19:10:26.962464 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import TIMESTAMP + +revision = "0242ac120002" +down_revision = "dfee8bd5d574" +branch_labels = None +depends_on = None +airflow_version = "3.1.0" + + +def upgrade(): + """Apply change deadline column in the deadline table to UTC datetime.""" + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.alter_column( + "deadline", + existing_type=sa.DateTime(), + type_=TIMESTAMP(timezone=True), + existing_nullable=False, + ) + + +def downgrade(): + """Unapply change deadline column in the deadline table to UTC datetime.""" + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.alter_column( + "deadline", + existing_type=TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + ) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index f774938629f97..6c031d407975c 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -93,7 +93,7 @@ class MappedClassProtocol(Protocol): "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", "3.0.0": "29ce7909c52b", - "3.1.0": "dfee8bd5d574", + "3.1.0": "0242ac120002", } From 46d7e6fda0fa87749b762e49aaf2685271c660d8 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Fri, 2 May 2025 14:40:24 -0700 Subject: [PATCH 3/9] rename DeadlineTrigger (and related variable names) to DeadlineReference --- airflow-core/src/airflow/models/deadline.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 1965fbacac1f3..7397298da9423 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -98,7 +98,7 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineTrigger(Enum): +class DeadlineReference(Enum): """ Store the calculation methods for the various Deadline Alert triggers. @@ -113,15 +113,15 @@ class DeadlineTrigger(Enum): DAG( dag_id='dag_with_deadline', deadline=DeadlineAlert( - trigger=DeadlineTrigger.DAGRUN_LOGICAL_DATE, + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, interval=timedelta(hours=1), callback=hello_callback, ) ) - To parse the deadline trigger later we will use something like: + To parse the deadline reference later we will use something like: - dag.deadline.trigger.evaluate_with(dag_id=dag.dag_id) + dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ DAGRUN_LOGICAL_DATE = "dagrun_logical_date" @@ -132,13 +132,13 @@ class DeadlineAlert(LoggingMixin): def __init__( self, - trigger: DeadlineTrigger, + reference: DeadlineReference, interval: timedelta, callback: Callable | str, callback_kwargs: dict | None = None, ): super().__init__() - self.trigger = trigger + self.reference = reference self.interval = interval self.callback_kwargs = callback_kwargs self.callback = self.get_callback_path(callback) @@ -164,7 +164,7 @@ def serialize_deadline_alert(self): return BaseSerialization.serialize( { - "trigger": self.trigger, + "reference": self.reference, "interval": self.interval, "callback": self.callback, "callback_kwargs": self.callback_kwargs, From 303ca5463f932ee4b859b7a739c48e164974d69f Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 6 May 2025 13:42:23 -0700 Subject: [PATCH 4/9] remove unused log import --- airflow-core/src/airflow/models/deadline.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 7397298da9423..6f900a8fc53b3 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import logging import sys from datetime import datetime, timedelta from enum import Enum @@ -36,8 +35,6 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session -log = logging.getLogger(__name__) - class Deadline(Base, LoggingMixin): """A Deadline is a 'need-by' date which triggers a callback if the provided time has passed.""" From b4b36705ab2063c65a6f0ec1d7148dfa97ec3dd3 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 6 May 2025 15:10:23 -0700 Subject: [PATCH 5/9] rebase and clear merge conflicts --- airflow-core/docs/migrations-ref.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 1c510e9a60b3d..436dae738990d 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,8 +39,8 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``0242ac120002`` (head) | ``dfee8bd5d574`` | ``3.1.0`` | Change Deadline in the Dag table from DateTime to UTC | -| | | | DateTime. | +| ``0242ac120002`` (head) | ``dfee8bd5d574`` | ``3.1.0`` | Change the Deadline column in the Deadline table from | +| | | | DateTime to UTC DateTime. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``dfee8bd5d574`` | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From 6ccf05e2fa3ea6bee168d4886986eeb1d93af1dd Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 14 May 2025 17:04:03 -0700 Subject: [PATCH 6/9] Improve callback validation and handling, improve docstrings --- airflow-core/src/airflow/models/dag.py | 11 ++++ airflow-core/src/airflow/models/deadline.py | 36 +++++++++---- .../src/airflow/utils/module_loading.py | 21 ++++++++ .../tests/unit/models/test_deadline.py | 51 ++++++++++++++----- .../tests/unit/utils/test_module_loading.py | 19 ++++++- 5 files changed, 115 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 7ffb0d9874e6d..54a0763953341 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -374,6 +374,17 @@ class DAG(TaskSDKDag, LoggingMixin): fails. Task instances that are running when a DagRun is timed out will be marked as skipped. :param sla_miss_callback: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1 :param deadline: Optional Deadline Alert for the DAG. + Specifies a time by which the DAG run should be complete, either in the form of a static datetime + or calculated relative to a reference timestamp. If the deadline passes before completion, the + provided callback is triggered. + + **Example**: To set the deadline for one hour after the DAG run starts you could use: + + DeadlineAlert( + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, + interval=timedelta(hours=1), + callback=my_callback + ) :param catchup: Perform scheduler catchup (or only run latest)? Defaults to False :param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function. diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 6f900a8fc53b3..a9bec76a77c51 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -import sys +import logging from datetime import datetime, timedelta from enum import Enum from typing import TYPE_CHECKING, Callable @@ -29,12 +29,15 @@ from airflow.models.base import Base, StringID from airflow.settings import json from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string, is_valid_dotpath from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime if TYPE_CHECKING: from sqlalchemy.orm import Session +logger = logging.getLogger(__name__) + class Deadline(Base, LoggingMixin): """A Deadline is a 'need-by' date which triggers a callback if the provided time has passed.""" @@ -146,15 +149,30 @@ def get_callback_path(_callback: str | Callable) -> str: # Get the reference path to the callable in the form `airflow.models.deadline.get_from_db` return f"{_callback.__module__}.{_callback.__qualname__}" - # Check if the dotpath can resolve to a callable; store it or raise a ValueError + if not isinstance(_callback, str) or not is_valid_dotpath(_callback.strip()): + raise ImportError(f"`{_callback}` doesn't look like a callback path.") + + stripped_callback = _callback.strip() + try: - _callback_module, _callback_name = _callback.rsplit(".", 1) - getattr(sys.modules[_callback_module], _callback_name) - return _callback - except (KeyError, AttributeError): - # KeyError if the path is not valid - # AttributeError if the provided value can't be rsplit - raise ValueError("callback is not a path to a callable") + # The provided callback is a string which appears to be a valid dotpath, attempt to import it. + callback = import_string(stripped_callback) + except ImportError as e: + # Logging here instead of failing because it is possible that the code for the callable + # exists somewhere other than on the DAG processor. We are making a best effort to validate, + # but can't rule out that it may be available at runtime even if it can not be imported here. + logger.debug( + "Callback %s is formatted like a callable dotpath, but could not be imported.\n%s", + stripped_callback, + e, + ) + return stripped_callback + + # If we get this far then the input is a string which can be imported, check if it is a callable. + if not callable(callback): + raise AttributeError(f"Provided callback {callback} is not callable.") + + return stripped_callback def serialize_deadline_alert(self): from airflow.serialization.serialized_objects import BaseSerialization diff --git a/airflow-core/src/airflow/utils/module_loading.py b/airflow-core/src/airflow/utils/module_loading.py index 36d935f109bac..42a078f3437d8 100644 --- a/airflow-core/src/airflow/utils/module_loading.py +++ b/airflow-core/src/airflow/utils/module_loading.py @@ -18,6 +18,7 @@ from __future__ import annotations import pkgutil +import re from importlib import import_module from typing import TYPE_CHECKING, Callable @@ -25,6 +26,26 @@ from types import ModuleType +def is_valid_dotpath(path: str) -> bool: + """ + Check if a string follows valid dotpath format (ie: 'package.subpackage.module'). + + :param path: String to check + """ + if not isinstance(path, str): + return False + + # Pattern explanation: + # ^ - Start of string + # [a-zA-Z_] - Must start with letter or underscore + # [a-zA-Z0-9_] - Following chars can be letters, numbers, or underscores + # (\.[a-zA-Z_][a-zA-Z0-9_]*)* - Can be followed by dots and valid identifiers + # $ - End of string + pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*$" + + return bool(re.match(pattern, path)) + + def import_string(dotted_path: str): """ Import a dotted module path and return the attribute/class designated by the last name in the path. diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index b38d7ca22a129..2f3275d311ebf 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -17,7 +17,7 @@ from __future__ import annotations import json -from typing import Callable +import logging import pytest import time_machine @@ -36,6 +36,7 @@ TEST_CALLBACK_KWARGS = {"to": "the_boss@work.com"} TEST_CALLBACK_PATH = f"{__name__}.test_callback" +UNIMPORTABLE_DOT_PATH = "valid.but.nonexistent.path" def test_callback(): @@ -141,19 +142,43 @@ def test_repr_without_callback_kwargs(self): class TestDeadlineAlert: @pytest.mark.parametrize( - "callback_value, expect_success", + "callback_value, expected_path", [ - pytest.param(test_callback, True, id="valid_callable"), - pytest.param(TEST_CALLBACK_PATH, True, id="valid_path"), - pytest.param("bad.path.to.some.callback", False, id="invalid_path"), - pytest.param(42, False, id="not_even_a_path"), + pytest.param(test_callback, TEST_CALLBACK_PATH, id="valid_callable"), + pytest.param(TEST_CALLBACK_PATH, TEST_CALLBACK_PATH, id="valid_path_string"), + pytest.param(lambda x: x, None, id="lambda_function"), + pytest.param(TEST_CALLBACK_PATH + " ", TEST_CALLBACK_PATH, id="path_with_whitespace"), + pytest.param(UNIMPORTABLE_DOT_PATH, UNIMPORTABLE_DOT_PATH, id="valid_format_not_importable"), ], ) - def test_get_callback_path(self, callback_value: Callable | str, expect_success: bool): - if expect_success: - path = DeadlineAlert.get_callback_path(callback_value) - - assert path == TEST_CALLBACK_PATH + def test_get_callback_path_happy_cases(self, callback_value, expected_path): + path = DeadlineAlert.get_callback_path(callback_value) + if expected_path is None: + assert path.endswith("") else: - with pytest.raises(ValueError, match="callback is not a path to a callable"): - DeadlineAlert.get_callback_path(callback_value) + assert path == expected_path + + @pytest.mark.parametrize( + "callback_value, error_type", + [ + pytest.param(42, ImportError, id="not_a_string"), + pytest.param("", ImportError, id="empty_string"), + pytest.param("os.path", AttributeError, id="non_callable_module"), + ], + ) + def test_get_callback_path_error_cases(self, callback_value, error_type): + expected_message = "" + if error_type is ImportError: + expected_message = "doesn't look like a callback path." + elif error_type is AttributeError: + expected_message = "is not callable." + + with pytest.raises(error_type, match=expected_message): + DeadlineAlert.get_callback_path(callback_value) + + def test_log_unimportable_but_properly_formatted_callback(self, caplog): + with caplog.at_level(logging.DEBUG): + path = DeadlineAlert.get_callback_path(UNIMPORTABLE_DOT_PATH) + + assert "could not be imported" in caplog.text + assert path == UNIMPORTABLE_DOT_PATH diff --git a/airflow-core/tests/unit/utils/test_module_loading.py b/airflow-core/tests/unit/utils/test_module_loading.py index 1f92a004b8fa8..5eb9c392cdce9 100644 --- a/airflow-core/tests/unit/utils/test_module_loading.py +++ b/airflow-core/tests/unit/utils/test_module_loading.py @@ -19,7 +19,7 @@ import pytest -from airflow.utils.module_loading import import_string +from airflow.utils.module_loading import import_string, is_valid_dotpath class TestModuleImport: @@ -33,3 +33,20 @@ def test_import_string(self): msg = 'Module "airflow.utils" does not define a "nonexistent" attribute' with pytest.raises(ImportError, match=msg): import_string("airflow.utils.nonexistent") + + @pytest.mark.parametrize( + "path, expected", + [ + pytest.param("valid_path", True, id="module_no_dots"), + pytest.param("valid.dot.path", True, id="standard_dotpath"), + pytest.param("package.sub_package.module", True, id="dotpath_with_underscores"), + pytest.param("MyPackage.MyClass", True, id="mixed_case_path"), + pytest.param("invalid..path", False, id="consecutive_dots_fails"), + pytest.param(".invalid.path", False, id="leading_dot_fails"), + pytest.param("invalid.path.", False, id="trailing_dot_fails"), + pytest.param("1invalid.path", False, id="leading_number_fails"), + pytest.param(42, False, id="not_a_string") + ], + ) + def test_is_valid_dotpath(self, path, expected): + assert is_valid_dotpath(path) == expected From 49ed9434b50b8ad01100d5c3b9ce31e923abf1ab Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 14 May 2025 17:34:15 -0700 Subject: [PATCH 7/9] static checks --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 428 +++++++++--------- .../tests/unit/utils/test_module_loading.py | 2 +- 3 files changed, 218 insertions(+), 214 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index da3765115e31d..aa5eabbb53352 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -066cb891884eea1ee0496b5c507d4a52c20d0440387f9ec8bacb1d616a26e40e \ No newline at end of file +4cda55eb221ee0340749c8dd41af7603220d7b300e2e808d733239a86e0c2837 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 879c9f17f903e..71e5d37feb505 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + dag_priority_parsing_request @@ -365,9 +365,9 @@ asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 @@ -605,111 +605,111 @@ asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 @@ -876,9 +876,9 @@ trigger--task_instance - + 0..N -{0,1} +{0,1} @@ -1371,108 +1371,112 @@ dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(200)] - -dag_display_name - - [VARCHAR(2000)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_paused - - [BOOLEAN] - -is_stale - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -relative_fileloc - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(200)] + +dag_display_name + + [VARCHAR(2000)] + +deadline + + [JSON] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_paused + + [BOOLEAN] + +is_stale + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +relative_fileloc + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] @@ -1484,23 +1488,23 @@ dag--dag_schedule_asset_alias_reference - + 0..N -1 +1 dag--dag_schedule_asset_reference - + 0..N -1 +1 dag--task_outlet_asset_reference - + 0..N -1 +1 @@ -1579,9 +1583,9 @@ dag--dag_schedule_asset_name_reference - + 0..N -1 +1 @@ -1969,26 +1973,26 @@ dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 - + dag_run--task_instance 0..N 1 - + dag_run--task_instance 0..N 1 - + dag_run--deadline 0..N @@ -2029,7 +2033,7 @@ NOT NULL - + dag_run--backfill_dag_run 0..N @@ -2065,7 +2069,7 @@ [VARCHAR(128)] - + dag_run--dag_run_note 1 diff --git a/airflow-core/tests/unit/utils/test_module_loading.py b/airflow-core/tests/unit/utils/test_module_loading.py index 5eb9c392cdce9..2b0659bc841c5 100644 --- a/airflow-core/tests/unit/utils/test_module_loading.py +++ b/airflow-core/tests/unit/utils/test_module_loading.py @@ -45,7 +45,7 @@ def test_import_string(self): pytest.param(".invalid.path", False, id="leading_dot_fails"), pytest.param("invalid.path.", False, id="trailing_dot_fails"), pytest.param("1invalid.path", False, id="leading_number_fails"), - pytest.param(42, False, id="not_a_string") + pytest.param(42, False, id="not_a_string"), ], ) def test_is_valid_dotpath(self, path, expected): From 35cddbb7bf4f8146683a1246dfa936d42d075e1f Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Thu, 15 May 2025 12:18:12 -0700 Subject: [PATCH 8/9] solve the riddles of the shpynx --- airflow-core/src/airflow/models/dag.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 54a0763953341..34f96c5258006 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -378,13 +378,14 @@ class DAG(TaskSDKDag, LoggingMixin): or calculated relative to a reference timestamp. If the deadline passes before completion, the provided callback is triggered. - **Example**: To set the deadline for one hour after the DAG run starts you could use: + **Example**: To set the deadline for one hour after the DAG run starts you could use :: + + DeadlineAlert( + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, + interval=timedelta(hours=1), + callback=my_callback, + ) - DeadlineAlert( - reference=DeadlineReference.DAGRUN_LOGICAL_DATE, - interval=timedelta(hours=1), - callback=my_callback - ) :param catchup: Perform scheduler catchup (or only run latest)? Defaults to False :param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function. From deac1584ec9a77e4af64ae9fb0e998afe7adaeeb Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Thu, 15 May 2025 12:19:05 -0700 Subject: [PATCH 9/9] remove unused LoggingMixin and rephrase an error message --- airflow-core/src/airflow/models/deadline.py | 8 +++----- airflow-core/tests/unit/models/test_deadline.py | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index a9bec76a77c51..b1d16f544ef6f 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -28,7 +28,6 @@ from airflow.models.base import Base, StringID from airflow.settings import json -from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string, is_valid_dotpath from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime @@ -39,7 +38,7 @@ logger = logging.getLogger(__name__) -class Deadline(Base, LoggingMixin): +class Deadline(Base): """A Deadline is a 'need-by' date which triggers a callback if the provided time has passed.""" __tablename__ = "deadline" @@ -127,7 +126,7 @@ class DeadlineReference(Enum): DAGRUN_LOGICAL_DATE = "dagrun_logical_date" -class DeadlineAlert(LoggingMixin): +class DeadlineAlert: """Store Deadline values needed to calculate the need-by timestamp and the callback information.""" def __init__( @@ -137,7 +136,6 @@ def __init__( callback: Callable | str, callback_kwargs: dict | None = None, ): - super().__init__() self.reference = reference self.interval = interval self.callback_kwargs = callback_kwargs @@ -150,7 +148,7 @@ def get_callback_path(_callback: str | Callable) -> str: return f"{_callback.__module__}.{_callback.__qualname__}" if not isinstance(_callback, str) or not is_valid_dotpath(_callback.strip()): - raise ImportError(f"`{_callback}` doesn't look like a callback path.") + raise ImportError(f"`{_callback}` doesn't look like a valid dot path.") stripped_callback = _callback.strip() diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 2f3275d311ebf..52d419a0cd0f9 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -169,7 +169,7 @@ def test_get_callback_path_happy_cases(self, callback_value, expected_path): def test_get_callback_path_error_cases(self, callback_value, error_type): expected_message = "" if error_type is ImportError: - expected_message = "doesn't look like a callback path." + expected_message = "doesn't look like a valid dot path." elif error_type is AttributeError: expected_message = "is not callable."