From a1c6b209bb318dc7873f1bd8059148ce15205012 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 18 Feb 2025 10:57:55 +0800 Subject: [PATCH] Introduce DeltaTriggerTimetable This completes the matrix of trigger vs data interval, and cron vs delta expression timetables. The goal is to replace 'schedule' expressions where currently the data interval timetables are used to use trigger timetables instead, e.g. 'schedule=timedelta(days=1)'. Trigger timetables do not have data intervals (by default), and also have a more intuitive logical date representation. This is a lot more intuitive not only for Airflow newcomers, but also existing users that do not care about data intervals, but use DAGs as a workflow that simply runs periodically. Don't do timetable equality Fix doc refs --- airflow/config_templates/config.yml | 21 +- airflow/timetables/_delta.py | 56 ++++ airflow/timetables/interval.py | 30 +-- airflow/timetables/trigger.py | 172 ++++++++---- .../authoring-and-scheduling/timetable.rst | 104 ++++++-- task_sdk/src/airflow/sdk/definitions/dag.py | 9 +- tests/timetables/test_interval_timetable.py | 25 +- tests/timetables/test_trigger_timetable.py | 249 +++++++++++++++++- 8 files changed, 542 insertions(+), 124 deletions(-) create mode 100644 airflow/timetables/_delta.py diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 13557c06458ac..ec5de5fbb954e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2447,7 +2447,26 @@ scheduler: type: boolean example: ~ default: "True" - see_also: ":ref:`Differences between the two cron timetables`" + see_also: ':ref:`Differences between "trigger" and "data interval" timetables`' + create_delta_data_intervals: + description: | + Whether to create DAG runs that span an interval or one single point in time when a timedelta or + relativedelta is provided to ``schedule`` argument of a DAG. + + * ``True``: **DeltaDataIntervalTimetable** is used, which is suitable for DAGs with well-defined data + interval. You get contiguous intervals from the end of the previous interval up to the scheduled + datetime. + * ``False``: **DeltaTriggerTimetable** is used, which is suitable for DAGs that simply want to say + e.g. "run this every day" and do not care about the data interval. + + Notably, for **DeltaTriggerTimetable**, the logical date is the same as the time the DAG Run will + try to schedule, while for **DeltaDataIntervalTimetable**, the logical date is the beginning of + the data interval, but the DAG Run will try to schedule at the end of the data interval. + version_added: 2.11.0 + type: boolean + example: ~ + default: "True" + see_also: ':ref:`Differences between "trigger" and "data interval" timetables`' enable_tracemalloc: description: | Whether to enable memory allocation tracing in the scheduler. If enabled, Airflow will start diff --git a/airflow/timetables/_delta.py b/airflow/timetables/_delta.py new file mode 100644 index 0000000000000..7203cd406310f --- /dev/null +++ b/airflow/timetables/_delta.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING + +from airflow.exceptions import AirflowTimetableInvalid +from airflow.utils.timezone import convert_to_utc + +if TYPE_CHECKING: + from dateutil.relativedelta import relativedelta + from pendulum import DateTime + + +class DeltaMixin: + """Mixin to provide interface to work with timedelta and relativedelta.""" + + def __init__(self, delta: datetime.timedelta | relativedelta) -> None: + self._delta = delta + + @property + def summary(self) -> str: + return str(self._delta) + + def validate(self) -> None: + now = datetime.datetime.now() + if (now + self._delta) <= now: + raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") + + def _get_next(self, current: DateTime) -> DateTime: + return convert_to_utc(current + self._delta) + + def _get_prev(self, current: DateTime) -> DateTime: + return convert_to_utc(current - self._delta) + + def _align_to_next(self, current: DateTime) -> DateTime: + return current + + def _align_to_prev(self, current: DateTime) -> DateTime: + return current diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index ef4b5d0afc437..81d53e3537813 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -22,10 +22,10 @@ from dateutil.relativedelta import relativedelta from pendulum import DateTime -from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables._cron import CronMixin +from airflow.timetables._delta import DeltaMixin from airflow.timetables.base import DagRunInfo, DataInterval, Timetable -from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow +from airflow.utils.timezone import coerce_datetime, utcnow if TYPE_CHECKING: from airflow.timetables.base import TimeRestriction @@ -173,7 +173,7 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: return DataInterval(start=self._get_prev(end), end=end) -class DeltaDataIntervalTimetable(_DataIntervalTimetable): +class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable): """ Timetable that schedules data intervals with a time delta. @@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable): instance. """ - def __init__(self, delta: Delta) -> None: - self._delta = delta - @classmethod def deserialize(cls, data: dict[str, Any]) -> Timetable: from airflow.serialization.serialized_objects import decode_relativedelta @@ -204,10 +201,6 @@ def __eq__(self, other: Any) -> bool: return NotImplemented return self._delta == other._delta - @property - def summary(self) -> str: - return str(self._delta) - def serialize(self) -> dict[str, Any]: from airflow.serialization.serialized_objects import encode_relativedelta @@ -218,23 +211,6 @@ def serialize(self) -> dict[str, Any]: delta = encode_relativedelta(self._delta) return {"delta": delta} - def validate(self) -> None: - now = datetime.datetime.now() - if (now + self._delta) <= now: - raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") - - def _get_next(self, current: DateTime) -> DateTime: - return convert_to_utc(current + self._delta) - - def _get_prev(self, current: DateTime) -> DateTime: - return convert_to_utc(current - self._delta) - - def _align_to_next(self, current: DateTime) -> DateTime: - return current - - def _align_to_prev(self, current: DateTime) -> DateTime: - return current - @staticmethod def _relativedelta_in_seconds(delta: relativedelta) -> int: return ( diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py index 23981e1ae3821..6aec327488667 100644 --- a/airflow/timetables/trigger.py +++ b/airflow/timetables/trigger.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any from airflow.timetables._cron import CronMixin +from airflow.timetables._delta import DeltaMixin from airflow.timetables.base import DagRunInfo, DataInterval, Timetable from airflow.utils.timezone import coerce_datetime, utcnow @@ -61,7 +62,122 @@ def _deserialize_run_immediately(value: bool | float) -> bool | datetime.timedel return value -class CronTriggerTimetable(CronMixin, Timetable): +class _TriggerTimetable(Timetable): + _interval: datetime.timedelta | relativedelta + + def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: + return DataInterval( + coerce_datetime(run_after - self._interval), + run_after, + ) + + def _calc_first_run(self) -> DateTime: + """ + If no start_time is set, determine the start. + + If True, always prefer past run, if False, never. If None, if within 10% of next run, + if timedelta, if within that timedelta from past run. + """ + raise NotImplementedError() + + def _align_to_next(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _align_to_prev(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _get_next(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _get_prev(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def next_dagrun_info( + self, + *, + last_automated_data_interval: DataInterval | None, + restriction: TimeRestriction, + ) -> DagRunInfo | None: + if restriction.catchup: + if last_automated_data_interval is not None: + next_start_time = self._get_next(last_automated_data_interval.end) + elif restriction.earliest is None: + next_start_time = self._calc_first_run() + else: + next_start_time = self._align_to_next(restriction.earliest) + else: + start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] + if last_automated_data_interval is not None: + start_time_candidates.append(self._get_next(last_automated_data_interval.end)) + elif restriction.earliest is None: + # Run immediately has no effect if there is restriction on earliest + start_time_candidates.append(self._calc_first_run()) + if restriction.earliest is not None: + start_time_candidates.append(self._align_to_next(restriction.earliest)) + next_start_time = max(start_time_candidates) + if restriction.latest is not None and restriction.latest < next_start_time: + return None + return DagRunInfo.interval( + coerce_datetime(next_start_time - self._interval), + next_start_time, + ) + + +class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable): + """ + Timetable that triggers DAG runs according to a cron expression. + + This is different from ``DeltaDataIntervalTimetable``, where the delta value + specifies the *data interval* of a DAG run. With this timetable, the data + intervals are specified independently. Also for the same reason, this + timetable kicks off a DAG run immediately at the start of the period, + instead of needing to wait for one data interval to pass. + + :param delta: How much time to wait between each run. + :param interval: The data interval of each run. Default is 0. + + *run_immediately* controls, if no *start_time* is given to the DAG, when + the first run of the DAG should be scheduled. It has no effect if there + already exist runs for this DAG. + + * If *True*, always run immediately the most recent possible DAG run. + * If *False*, wait to run until the next scheduled time in the future. + * If passed a ``timedelta``, will run the most recent possible DAG run + if that run's ``data_interval_end`` is within timedelta of now. + * If *None*, the timedelta is calculated as 10% of the time between the + most recent past scheduled time and the next scheduled time. E.g. if + running every hour, this would run the previous time if less than 6 + minutes had past since the previous run time, otherwise it would wait + until the next hour. + """ + + def __init__( + self, + delta: datetime.timedelta | relativedelta, + *, + interval: datetime.timedelta | relativedelta = datetime.timedelta(), + ) -> None: + super().__init__(delta) + self._interval = interval + + @classmethod + def deserialize(cls, data: dict[str, Any]) -> Timetable: + return cls( + _deserialize_interval(data["delta"]), + interval=_deserialize_interval(data["interval"]), + ) + + def serialize(self) -> dict[str, Any]: + return { + "delta": _serialize_interval(self._delta), + "interval": _serialize_interval(self._interval), + } + + def _calc_first_run(self) -> DateTime: + return self._align_to_prev(coerce_datetime(utcnow())) + + +class CronTriggerTimetable(CronMixin, _TriggerTimetable): """ Timetable that triggers DAG runs according to a cron expression. @@ -103,7 +219,7 @@ def __init__( ) -> None: super().__init__(cron, timezone) self._interval = interval - self.run_immediately = run_immediately + self._run_immediately = run_immediately @classmethod def deserialize(cls, data: dict[str, Any]) -> Timetable: @@ -123,50 +239,10 @@ def serialize(self) -> dict[str, Any]: "expression": self._expression, "timezone": encode_timezone(self._timezone), "interval": _serialize_interval(self._interval), - "run_immediately": _serialize_run_immediately(self.run_immediately), + "run_immediately": _serialize_run_immediately(self._run_immediately), } - def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: - return DataInterval( - # pendulum.Datetime ± timedelta should return pendulum.Datetime - # however mypy decide that output would be datetime.datetime - run_after - self._interval, # type: ignore[arg-type] - run_after, - ) - - def next_dagrun_info( - self, - *, - last_automated_data_interval: DataInterval | None, - restriction: TimeRestriction, - ) -> DagRunInfo | None: - if restriction.catchup: - if last_automated_data_interval is not None: - next_start_time = self._get_next(last_automated_data_interval.end) - elif restriction.earliest is None: - next_start_time = self._calc_first_run() - else: - next_start_time = self._align_to_next(restriction.earliest) - else: - start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] - if last_automated_data_interval is not None: - start_time_candidates.append(self._get_next(last_automated_data_interval.end)) - elif restriction.earliest is None: - # Run immediately has no effect if there is restriction on earliest - start_time_candidates.append(self._calc_first_run()) - if restriction.earliest is not None: - start_time_candidates.append(self._align_to_next(restriction.earliest)) - next_start_time = max(start_time_candidates) - if restriction.latest is not None and restriction.latest < next_start_time: - return None - return DagRunInfo.interval( - # pendulum.Datetime ± timedelta should return pendulum.Datetime - # however mypy decide that output would be datetime.datetime - next_start_time - self._interval, # type: ignore[arg-type] - next_start_time, - ) - - def _calc_first_run(self): + def _calc_first_run(self) -> DateTime: """ If no start_time is set, determine the start. @@ -176,13 +252,13 @@ def _calc_first_run(self): now = coerce_datetime(utcnow()) past_run_time = self._align_to_prev(now) next_run_time = self._align_to_next(now) - if self.run_immediately is True: # not truthy, actually set to True + if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true. return past_run_time gap_between_runs = next_run_time - past_run_time gap_to_past = now - past_run_time - if isinstance(self.run_immediately, datetime.timedelta): - buffer_between_runs = self.run_immediately + if isinstance(self._run_immediately, datetime.timedelta): + buffer_between_runs = self._run_immediately else: buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5)) if gap_to_past <= buffer_between_runs: @@ -238,7 +314,7 @@ def serialize(self) -> dict[str, Any]: "expressions": [t._expression for t in self._timetables], "timezone": encode_timezone(timetable._timezone), "interval": _serialize_interval(timetable._interval), - "run_immediately": _serialize_run_immediately(timetable.run_immediately), + "run_immediately": _serialize_run_immediately(timetable._run_immediately), } @property diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index 79d5e50777008..79201da648829 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -64,6 +64,53 @@ Built-in Timetables Airflow comes with several common timetables built-in to cover the most common use cases. Additional timetables may be available in plugins. +.. _DeltaTriggerTimetable: + +DeltaTriggerTimetable +^^^^^^^^^^^^^^^^^^^^^ + +A timetable that accepts a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta``, and runs +the DAG once a delta passes. + +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ + +.. code-block:: python + + from datetime import timedelta + + from airflow.timetables.trigger import DeltaTriggerTimetable + + + @dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...) # Once every week. + def example_dag(): + pass + +You can also provide a static data interval to the timetable. The optional ``interval`` argument also +should be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta``. When using these +arguments, a triggered DAG run's data interval spans the specified duration, and *ends* with the trigger time. + +.. code-block:: python + + from datetime import UTC, datetime, timedelta + + from dateutil.relativedelta import relativedelta, FR + + from airflow.timetables.trigger import DeltaTriggerTimetable + + + @dag( + # Runs every Friday at 18:00 to cover the work week. + schedule=DeltaTriggerTimetable( + relativedelta(weekday=FR(), hour=18), + interval=timedelta(days=4, hours=9), + ), + start_date=datetime(2025, 1, 3, 18, tzinfo=UTC), + ..., + ) + def example_dag(): + pass + + .. _CronTriggerTimetable: CronTriggerTimetable @@ -71,7 +118,7 @@ CronTriggerTimetable A timetable that accepts a cron expression, and triggers DAG runs according to it. -.. seealso:: `Differences between the two cron timetables`_ +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ .. code-block:: python @@ -171,7 +218,7 @@ CronDataIntervalTimetable A timetable that accepts a cron expression, creates data intervals according to the interval between each cron trigger points, and triggers a DAG run at the end of each data interval. -.. seealso:: `Differences between the two cron timetables`_ +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ .. seealso:: `Differences between the cron and delta data interval timetables`_ Select this timetable by providing a valid cron expression as a string to the ``schedule`` @@ -248,37 +295,39 @@ Here's an example of a DAG using ``AssetOrTimeSchedule``: Timetables comparisons ---------------------- -.. _Differences between the two cron timetables: +.. _Differences between "trigger" and "data interval" timetables: + +Differences between "trigger" and "data interval" timetables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Differences between the two cron timetables -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Airflow has two sets of timetables for cron and delta schedules: -Airflow has two timetables `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ that accept a cron expression. +* CronTriggerTimetable_ and CronDataIntervalTimetable_ both accept a cron expression. +* DeltaTriggerTimetable_ and DeltaDataIntervalTimetable_ both accept a timedelta or relativedelta. -However, there are differences between the two: -- `CronTriggerTimetable`_ does not address *Data Interval*, while `CronDataIntervalTimetable`_ does. -- The timestamp in the ``run_id``, the ``logical_date`` for `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ are defined differently based on how they handle the data interval, as described in :ref:`timetables_run_id_logical_date`. +- A trigger timetable (CronTriggerTimetable_ or DeltaTriggerTimetable_) does not address the concept of *data interval*, while a "data interval" one (CronDataIntervalTimetable_ or DeltaDataIntervalTimetable_) does. +- The timestamp in the ``run_id``, the ``logical_date`` of the two timetable kinds are defined differently based on how they handle the data interval, as described in :ref:`timetables_run_id_logical_date`. Whether taking care of *Data Interval* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -`CronTriggerTimetable`_ *does not* include *data interval*. This means that the value of ``data_interval_start`` and -``data_interval_end`` (and the legacy ``execution_date``) are the same; the time when a DAG run is triggered. +A trigger timetable *does not* include *data interval*. This means that the value of ``data_interval_start`` +and ``data_interval_end`` (and the legacy ``execution_date``) are the same; the time when a DAG run is +triggered. -However, `CronDataIntervalTimetable`_ *does* include *data interval*. This means the value of -``data_interval_start`` and ``data_interval_end`` (and legacy ``execution_date``) are different. ``data_interval_start`` is the time when a -DAG run is triggered and ``data_interval_end`` is the end of the interval. +For a data interval timetable, the value of ``data_interval_start`` and ``data_interval_end`` (and legacy +``execution_date``) are different. ``data_interval_start`` is the time when a DAG run is triggered and +``data_interval_end`` is the end of the interval. *Catchup* behavior ^^^^^^^^^^^^^^^^^^ -Whether you're using `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_, there is no difference when ``catchup`` is ``True``. - You might want to use ``False`` for ``catchup`` for certain scenarios, to prevent running unnecessary DAGs: - If you create a new DAG with a start date in the past, and don't want to run DAGs for the past. If ``catchup`` is ``True``, Airflow runs all DAGs that would have run in that time interval. - If you pause an existing DAG, and then restart it at a later date, and don't want to If ``catchup`` is ``True``, -In these scenarios, the ``logical_date`` in the ``run_id`` are based on how `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_ handle the data interval. +In these scenarios, the ``logical_date`` in the ``run_id`` are based on how how the timetable handles the data +interval. See :ref:`dag-catchup` for more information about how DAG runs are triggered when using ``catchup``. @@ -287,30 +336,29 @@ See :ref:`dag-catchup` for more information about how DAG runs are triggered whe The time when a DAG run is triggered ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at the same time. However, the timestamp for the ``run_id`` is different for each. - -- `CronTriggerTimetable`_ has a ``run_id`` timestamp, the ``logical_date``, showing when DAG run is able to start. -- `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at the same time. However, the timestamp for the ``run_id`` (``logical_date``) is different for each. +Both trigger and data interval timetables trigger DAG runs at the same time. However, the timestamp for the +``run_id`` is different for each. This is because ``run_id`` is based on ``logical_date``. For example, suppose there is a cron expression ``@daily`` or ``0 0 * * *``, which is scheduled to run at 12AM every day. If you enable DAGs using the two timetables at 3PM on January 31st, -- `CronTriggerTimetable`_ triggers a new DAG run at 12AM on February 1st. The ``run_id`` timestamp is midnight, on February 1st. -- `CronDataIntervalTimetable`_ immediately triggers a new DAG run, because a DAG run for the daily time interval beginning at 12AM on January 31st did not occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is the beginning of the data interval. +- `CronTriggerTimetable`_ creates a new DAG run at 12AM on February 1st. The ``run_id`` timestamp is midnight, on February 1st. +- `CronDataIntervalTimetable`_ immediately creates a new DAG run, because a DAG run for the daily time interval beginning at 12AM on January 31st did not occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is the beginning of the data interval. -This is another example showing the difference in the case of skipping DAG runs. +The following is another example showing the difference in the case of skipping DAG runs: Suppose there are two running DAGs with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the DAGs at 3PM on January 31st and re-enable them at 3PM on February 2nd, - `CronTriggerTimetable`_ skips the DAG runs that were supposed to trigger on February 1st and 2nd. The next DAG run will be triggered at 12AM on February 3rd. - `CronDataIntervalTimetable`_ skips the DAG runs that were supposed to trigger on February 1st only. A DAG run for February 2nd is immediately triggered after you re-enable the DAG. -In these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is more intuitive and more similar to what -people expect cron to behave than how `CronDataIntervalTimetable`_ does. +In these examples, you see how a trigger timetable creates DAG runs more intuitively and similar to what +people expect a workflow to behave, while a data interval timetable is designed heavily around the data +interval it processes, and does not reflect a workflow's own properties. .. _Differences between the cron and delta data interval timetables: -Differences between the cron and delta data interval timetables: -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Differences between the cron and delta data interval timetables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Choosing between `DeltaDataIntervalTimetable`_ and `CronDataIntervalTimetable`_ depends on your use case. If you enable a DAG at 01:05 on February 1st, the following table summarizes the DAG runs created and the diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py b/task_sdk/src/airflow/sdk/definitions/dag.py index ce37ec62ed8eb..d716daa0f4697 100644 --- a/task_sdk/src/airflow/sdk/definitions/dag.py +++ b/task_sdk/src/airflow/sdk/definitions/dag.py @@ -115,7 +115,7 @@ def _create_timetable(interval: ScheduleInterval, timezone: Timezone | FixedTime """Create a Timetable instance from a plain ``schedule`` value.""" from airflow.configuration import conf as airflow_conf from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable - from airflow.timetables.trigger import CronTriggerTimetable + from airflow.timetables.trigger import CronTriggerTimetable, DeltaTriggerTimetable if interval is None: return NullTimetable() @@ -124,12 +124,13 @@ def _create_timetable(interval: ScheduleInterval, timezone: Timezone | FixedTime if interval == "@continuous": return ContinuousTimetable() if isinstance(interval, (timedelta, relativedelta)): - return DeltaDataIntervalTimetable(interval) + if airflow_conf.getboolean("scheduler", "create_cron_data_intervals"): + return DeltaDataIntervalTimetable(interval) + return DeltaTriggerTimetable(interval) if isinstance(interval, str): if airflow_conf.getboolean("scheduler", "create_cron_data_intervals"): return CronDataIntervalTimetable(interval, timezone) - else: - return CronTriggerTimetable(interval, timezone=timezone) + return CronTriggerTimetable(interval, timezone=timezone) raise ValueError(f"{interval!r} is not a valid schedule.") diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 75060767f602f..4ca79f70452ab 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -181,9 +181,28 @@ def test_validate_failure(timetable: Timetable, error_message: str) -> None: assert str(ctx.value) == error_message -def test_cron_interval_timezone_from_string(): - timetable = CronDataIntervalTimetable("@hourly", "UTC") - assert timetable.serialize()["timezone"] == "UTC" +def test_cron_interval_serialize(): + data = HOURLY_CRON_TIMETABLE.serialize() + assert data == {"expression": "0 * * * *", "timezone": "UTC"} + tt = CronDataIntervalTimetable.deserialize(data) + assert isinstance(tt, CronDataIntervalTimetable) + assert tt._expression == HOURLY_CRON_TIMETABLE._expression + assert tt._timezone == HOURLY_CRON_TIMETABLE._timezone + + +@pytest.mark.parametrize( + "timetable, expected_data", + [ + (HOURLY_RELATIVEDELTA_TIMETABLE, {"delta": {"hours": 1}}), + (HOURLY_TIMEDELTA_TIMETABLE, {"delta": 3600.0}), + ], +) +def test_delta_interval_serialize(timetable, expected_data): + data = timetable.serialize() + assert data == expected_data + tt = DeltaDataIntervalTimetable.deserialize(data) + assert isinstance(tt, DeltaDataIntervalTimetable) + assert tt._delta == timetable._delta @pytest.mark.parametrize( diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index 03010118b1d66..baf96b691f477 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -25,8 +25,12 @@ import time_machine from airflow.exceptions import AirflowTimetableInvalid -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction -from airflow.timetables.trigger import CronTriggerTimetable, MultipleCronTriggerTimetable +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.timetables.trigger import ( + CronTriggerTimetable, + DeltaTriggerTimetable, + MultipleCronTriggerTimetable, +) from airflow.utils.timezone import utc START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) @@ -39,6 +43,9 @@ HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc, run_immediately=True) +HOURLY_TIMEDELTA_TIMETABLE = DeltaTriggerTimetable(datetime.timedelta(hours=1)) +HOURLY_RELATIVEDELTA_TIMETABLE = DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(hours=1)) + DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -80,6 +87,47 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( assert next_info == DagRunInfo.exact(next_start_time) +@pytest.mark.parametrize( + "last_automated_data_interval, next_start_time", + [ + pytest.param( + None, + CURRENT_TIME, + id="first-run", + ), + pytest.param( + DataInterval.exact(YESTERDAY + DELTA_FROM_MIDNIGHT), + CURRENT_TIME + DELTA_FROM_MIDNIGHT, + id="before-now", + ), + pytest.param( + DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT), + CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT, + id="after-now", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(DeltaTriggerTimetable(datetime.timedelta(days=1)), id="timedelta"), + pytest.param(DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=1)), id="relativedelta"), + ], +) +@time_machine.travel(CURRENT_TIME) +def test_daily_delta_trigger_no_catchup_first_starts_at_next_schedule( + last_automated_data_interval: DataInterval | None, + next_start_time: pendulum.DateTime, + timetable: Timetable, +) -> None: + """If ``catchup=False`` and start_date is a day before""" + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), + ) + assert next_info == DagRunInfo.exact(next_start_time) + + @pytest.mark.parametrize( "current_time, earliest, expected", [ @@ -128,6 +176,62 @@ def test_hourly_cron_trigger_no_catchup_next_info( assert next_info == expected +@pytest.mark.parametrize( + "current_time, earliest, expected", + [ + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + id="current_time_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + id="current_time_not_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), + id="current_time_miss_one_interval_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc)), + id="current_time_miss_one_interval_not_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc)), + id="future_start_date", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_hourly_delta_trigger_no_catchup_next_info( + current_time: pendulum.DateTime, + earliest: pendulum.DateTime, + expected: DagRunInfo, + timetable: Timetable, +) -> None: + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=PREV_DATA_INTERVAL_EXACT, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=False), + ) + assert next_info == expected + + @pytest.mark.parametrize( "last_automated_data_interval, earliest, expected", [ @@ -169,6 +273,55 @@ def test_hourly_cron_trigger_catchup_next_info( assert next_info == expected +@pytest.mark.parametrize( + "last_automated_data_interval, earliest, expected", + [ + pytest.param( + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), + id="last_automated_on_boundary", + ), + pytest.param( + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc)), + id="last_automated_not_on_boundary", + ), + pytest.param( + None, + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + id="no_last_automated_with_earliest_on_boundary", + ), + pytest.param( + None, + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + id="no_last_automated_with_earliest_not_on_boundary", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_hourly_delta_trigger_catchup_next_info( + last_automated_data_interval: DataInterval | None, + earliest: pendulum.DateTime | None, + expected: DagRunInfo | None, + timetable: Timetable, +) -> None: + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=True), + ) + assert next_info == expected + + def test_cron_trigger_next_info_with_interval(): # Runs every Monday on 16:30, covering the day before the run. timetable = CronTriggerTimetable( @@ -190,32 +343,60 @@ def test_cron_trigger_next_info_with_interval(): ) -def test_validate_success() -> None: - HOURLY_CRON_TRIGGER_TIMETABLE.validate() - +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_CRON_TRIGGER_TIMETABLE, id="cron"), + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_validate_success(timetable: Timetable) -> None: + timetable.validate() -def test_validate_failure() -> None: - timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc) +@pytest.mark.parametrize( + "timetable, message", + [ + pytest.param( + CronTriggerTimetable("0 0 1 13 0", timezone=utc), + "[0 0 1 13 0] is not acceptable, out of range", + id="cron", + ), + pytest.param( + DeltaTriggerTimetable(datetime.timedelta(days=-1)), + "schedule interval must be positive, not datetime.timedelta(days=-1)", + id="timedelta", + ), + pytest.param( + DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=-1)), + "schedule interval must be positive, not relativedelta(days=-1)", + id="relativedelta", + ), + ], +) +def test_validate_failure(timetable: Timetable, message: str) -> None: with pytest.raises(AirflowTimetableInvalid) as ctx: timetable.validate() - assert str(ctx.value) == "[0 0 1 13 0] is not acceptable, out of range" + assert str(ctx.value) == message @pytest.mark.parametrize( "timetable, data", [ - ( + pytest.param( HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", "run_immediately": True, "timezone": "UTC", "interval": 0.0}, + id="hourly", ), - ( + pytest.param( CronTriggerTimetable( "0 0 1 12 *", timezone=utc, run_immediately=False, interval=datetime.timedelta(hours=2) ), {"expression": "0 0 1 12 *", "run_immediately": False, "timezone": "UTC", "interval": 7200.0}, + id="interval", ), - ( + pytest.param( CronTriggerTimetable( "0 0 1 12 0", timezone="Asia/Taipei", @@ -228,10 +409,11 @@ def test_validate_failure() -> None: "timezone": "Asia/Taipei", "interval": {"weekday": [0]}, }, + id="non-utc-interval", ), ], ) -def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.Any]) -> None: +def test_cron_trigger_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.Any]) -> None: assert timetable.serialize() == data tt = CronTriggerTimetable.deserialize(data) @@ -239,6 +421,47 @@ def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.A assert tt._expression == timetable._expression assert tt._timezone == timetable._timezone assert tt._interval == timetable._interval + assert tt._run_immediately == timetable._run_immediately + + +@pytest.mark.parametrize( + "timetable, data", + [ + pytest.param( + HOURLY_TIMEDELTA_TIMETABLE, + {"delta": 3600.0, "interval": 0.0}, + id="timedelta", + ), + pytest.param( + DeltaTriggerTimetable( + datetime.timedelta(hours=3), + interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), + ), + {"delta": 10800.0, "interval": {"weekday": [0]}}, + id="timedelta-interval", + ), + pytest.param( + HOURLY_RELATIVEDELTA_TIMETABLE, + {"delta": {"hours": 1}, "interval": 0.0}, + id="relativedelta", + ), + pytest.param( + DeltaTriggerTimetable( + dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), + interval=datetime.timedelta(days=7), + ), + {"delta": {"weekday": [0]}, "interval": 604800.0}, + id="relativedelta-interval", + ), + ], +) +def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: dict[str, typing.Any]) -> None: + assert timetable.serialize() == data + + tt = DeltaTriggerTimetable.deserialize(data) + assert isinstance(tt, DeltaTriggerTimetable) + assert tt._delta == timetable._delta + assert tt._interval == timetable._interval JUST_AFTER = pendulum.datetime(year=2024, month=8, day=15, hour=3, minute=5) @@ -342,4 +565,4 @@ def test_multi_serialization(): assert tt._timetables[1]._expression == "*/2 * * * *" assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc assert tt._timetables[0]._interval == tt._timetables[1]._interval == datetime.timedelta(minutes=10) - assert tt._timetables[0].run_immediately == tt._timetables[1].run_immediately is False + assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is False