diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 04b4e27843f22..d90a931db2374 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2661,7 +2661,26 @@ scheduler: type: boolean example: ~ default: "True" - see_also: ":ref:`Differences between the two cron timetables`" + see_also: ':ref:`Differences between "trigger" and "data interval" timetables`' + create_delta_data_intervals: + description: | + Whether to create DAG runs that span an interval or one single point in time when a timedelta or + relativedelta is provided to ``schedule`` argument of a DAG. + + * ``True``: **DeltaDataIntervalTimetable** is used, which is suitable for DAGs with well-defined data + interval. You get contiguous intervals from the end of the previous interval up to the scheduled + datetime. + * ``False``: **DeltaTriggerTimetable** is used, which is suitable for DAGs that simply want to say + e.g. "run this every day" and do not care about the data interval. + + Notably, for **DeltaTriggerTimetable**, the logical date is the same as the time the DAG Run will + try to schedule, while for **DeltaDataIntervalTimetable**, the logical date is the beginning of + the data interval, but the DAG Run will try to schedule at the end of the data interval. + version_added: 2.11.0 + type: boolean + example: ~ + default: "True" + see_also: ':ref:`Differences between "trigger" and "data interval" timetables`' triggerer: description: ~ options: diff --git a/airflow/timetables/_delta.py b/airflow/timetables/_delta.py new file mode 100644 index 0000000000000..7203cd406310f --- /dev/null +++ b/airflow/timetables/_delta.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING + +from airflow.exceptions import AirflowTimetableInvalid +from airflow.utils.timezone import convert_to_utc + +if TYPE_CHECKING: + from dateutil.relativedelta import relativedelta + from pendulum import DateTime + + +class DeltaMixin: + """Mixin to provide interface to work with timedelta and relativedelta.""" + + def __init__(self, delta: datetime.timedelta | relativedelta) -> None: + self._delta = delta + + @property + def summary(self) -> str: + return str(self._delta) + + def validate(self) -> None: + now = datetime.datetime.now() + if (now + self._delta) <= now: + raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") + + def _get_next(self, current: DateTime) -> DateTime: + return convert_to_utc(current + self._delta) + + def _get_prev(self, current: DateTime) -> DateTime: + return convert_to_utc(current - self._delta) + + def _align_to_next(self, current: DateTime) -> DateTime: + return current + + def _align_to_prev(self, current: DateTime) -> DateTime: + return current diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index ef4b5d0afc437..81d53e3537813 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -22,10 +22,10 @@ from dateutil.relativedelta import relativedelta from pendulum import DateTime -from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables._cron import CronMixin +from airflow.timetables._delta import DeltaMixin from airflow.timetables.base import DagRunInfo, DataInterval, Timetable -from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow +from airflow.utils.timezone import coerce_datetime, utcnow if TYPE_CHECKING: from airflow.timetables.base import TimeRestriction @@ -173,7 +173,7 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: return DataInterval(start=self._get_prev(end), end=end) -class DeltaDataIntervalTimetable(_DataIntervalTimetable): +class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable): """ Timetable that schedules data intervals with a time delta. @@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable): instance. """ - def __init__(self, delta: Delta) -> None: - self._delta = delta - @classmethod def deserialize(cls, data: dict[str, Any]) -> Timetable: from airflow.serialization.serialized_objects import decode_relativedelta @@ -204,10 +201,6 @@ def __eq__(self, other: Any) -> bool: return NotImplemented return self._delta == other._delta - @property - def summary(self) -> str: - return str(self._delta) - def serialize(self) -> dict[str, Any]: from airflow.serialization.serialized_objects import encode_relativedelta @@ -218,23 +211,6 @@ def serialize(self) -> dict[str, Any]: delta = encode_relativedelta(self._delta) return {"delta": delta} - def validate(self) -> None: - now = datetime.datetime.now() - if (now + self._delta) <= now: - raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") - - def _get_next(self, current: DateTime) -> DateTime: - return convert_to_utc(current + self._delta) - - def _get_prev(self, current: DateTime) -> DateTime: - return convert_to_utc(current - self._delta) - - def _align_to_next(self, current: DateTime) -> DateTime: - return current - - def _align_to_prev(self, current: DateTime) -> DateTime: - return current - @staticmethod def _relativedelta_in_seconds(delta: relativedelta) -> int: return ( diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py index a4666946fa7be..8dad7dfecd190 100644 --- a/airflow/timetables/trigger.py +++ b/airflow/timetables/trigger.py @@ -20,8 +20,9 @@ from typing import TYPE_CHECKING, Any from airflow.timetables._cron import CronMixin +from airflow.timetables._delta import DeltaMixin from airflow.timetables.base import DagRunInfo, DataInterval, Timetable -from airflow.utils import timezone +from airflow.utils.timezone import coerce_datetime, utcnow if TYPE_CHECKING: from dateutil.relativedelta import relativedelta @@ -31,60 +32,43 @@ from airflow.timetables.base import TimeRestriction -class CronTriggerTimetable(CronMixin, Timetable): - """ - Timetable that triggers DAG runs according to a cron expression. - - This is different from ``CronDataIntervalTimetable``, where the cron - expression specifies the *data interval* of a DAG run. With this timetable, - the data intervals are specified independently from the cron expression. - Also for the same reason, this timetable kicks off a DAG run immediately at - the start of the period (similar to POSIX cron), instead of needing to wait - for one data interval to pass. +def _serialize_interval(interval: datetime.timedelta | relativedelta) -> float | dict: + from airflow.serialization.serialized_objects import encode_relativedelta - Don't pass ``@once`` in here; use ``OnceTimetable`` instead. - """ + if isinstance(interval, datetime.timedelta): + return interval.total_seconds() + return encode_relativedelta(interval) - def __init__( - self, - cron: str, - *, - timezone: str | Timezone | FixedTimezone, - interval: datetime.timedelta | relativedelta = datetime.timedelta(), - ) -> None: - super().__init__(cron, timezone) - self._interval = interval - @classmethod - def deserialize(cls, data: dict[str, Any]) -> Timetable: - from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone +def _deserialize_interval(value: int | dict) -> datetime.timedelta | relativedelta: + from airflow.serialization.serialized_objects import decode_relativedelta - interval: datetime.timedelta | relativedelta - if isinstance(data["interval"], dict): - interval = decode_relativedelta(data["interval"]) - else: - interval = datetime.timedelta(seconds=data["interval"]) - return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval) + if isinstance(value, dict): + return decode_relativedelta(value) + return datetime.timedelta(seconds=value) - def serialize(self) -> dict[str, Any]: - from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone - interval: float | dict[str, Any] - if isinstance(self._interval, datetime.timedelta): - interval = self._interval.total_seconds() - else: - interval = encode_relativedelta(self._interval) - timezone = encode_timezone(self._timezone) - return {"expression": self._expression, "timezone": timezone, "interval": interval} +class _TriggerTimetable(Timetable): + _interval: datetime.timedelta | relativedelta def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: return DataInterval( - # pendulum.Datetime ± timedelta should return pendulum.Datetime - # however mypy decide that output would be datetime.datetime - run_after - self._interval, # type: ignore[arg-type] + coerce_datetime(run_after - self._interval), run_after, ) + def _align_to_next(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _align_to_prev(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _get_next(self, current: DateTime) -> DateTime: + raise NotImplementedError() + + def _get_prev(self, current: DateTime) -> DateTime: + raise NotImplementedError() + def next_dagrun_info( self, *, @@ -99,7 +83,7 @@ def next_dagrun_info( else: next_start_time = self._align_to_next(restriction.earliest) else: - start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))] + start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] if last_automated_data_interval is not None: start_time_candidates.append(self._get_next(last_automated_data_interval.end)) if restriction.earliest is not None: @@ -113,3 +97,85 @@ def next_dagrun_info( next_start_time - self._interval, # type: ignore[arg-type] next_start_time, ) + + +class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable): + """ + Timetable that triggers DAG runs according to a cron expression. + + This is different from ``DeltaDataIntervalTimetable``, where the delta value + specifies the *data interval* of a DAG run. With this timetable, the data + intervals are specified independently. Also for the same reason, this + timetable kicks off a DAG run immediately at the start of the period, + instead of needing to wait for one data interval to pass. + + :param delta: How much time to wait between each run. + :param interval: The data interval of each run. Default is 0. + """ + + def __init__( + self, + delta: datetime.timedelta | relativedelta, + *, + interval: datetime.timedelta | relativedelta = datetime.timedelta(), + ) -> None: + super().__init__(delta) + self._interval = interval + + @classmethod + def deserialize(cls, data: dict[str, Any]) -> Timetable: + return cls( + _deserialize_interval(data["delta"]), + interval=_deserialize_interval(data["interval"]), + ) + + def serialize(self) -> dict[str, Any]: + return { + "delta": _serialize_interval(self._delta), + "interval": _serialize_interval(self._interval), + } + + +class CronTriggerTimetable(CronMixin, _TriggerTimetable): + """ + Timetable that triggers DAG runs according to a cron expression. + + This is different from ``CronDataIntervalTimetable``, where the cron + expression specifies the *data interval* of a DAG run. With this timetable, + the data intervals are specified independently from the cron expression. + Also for the same reason, this timetable kicks off a DAG run immediately at + the start of the period (similar to POSIX cron), instead of needing to wait + for one data interval to pass. + + Don't pass ``@once`` in here; use ``OnceTimetable`` instead. + """ + + def __init__( + self, + cron: str, + *, + timezone: str | Timezone | FixedTimezone, + interval: datetime.timedelta | relativedelta = datetime.timedelta(), + ) -> None: + super().__init__(cron, timezone) + self._interval = interval + + @classmethod + def deserialize(cls, data: dict[str, Any]) -> Timetable: + from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone + + interval: datetime.timedelta | relativedelta + if isinstance(data["interval"], dict): + interval = decode_relativedelta(data["interval"]) + else: + interval = datetime.timedelta(seconds=data["interval"]) + return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval) + + def serialize(self) -> dict[str, Any]: + from airflow.serialization.serialized_objects import encode_timezone + + return { + "expression": self._expression, + "timezone": encode_timezone(self._timezone), + "interval": _serialize_interval(self._interval), + } diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index d9b47dd9c4460..698aeb850e43e 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -64,6 +64,53 @@ Built-in Timetables Airflow comes with several common timetables built-in to cover the most common use cases. Additional timetables may be available in plugins. +.. _DeltaTriggerTimetable: + +DeltaTriggerTimetable +^^^^^^^^^^^^^^^^^^^^^ + +A timetable that accepts a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta``, and runs +the DAG once a delta passes. + +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ + +.. code-block:: python + + from datetime import timedelta + + from airflow.timetables.trigger import DeltaTriggerTimetable + + + @dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...) # Once every week. + def example_dag(): + pass + +You can also provide a static data interval to the timetable. The optional ``interval`` argument also +should be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta``. When using these +arguments, a triggered DAG run's data interval spans the specified duration, and *ends* with the trigger time. + +.. code-block:: python + + from datetime import UTC, datetime, timedelta + + from dateutil.relativedelta import relativedelta, FR + + from airflow.timetables.trigger import DeltaTriggerTimetable + + + @dag( + # Runs every Friday at 18:00 to cover the work week. + schedule=DeltaTriggerTimetable( + relativedelta(weekday=FR(), hour=18), + interval=timedelta(days=4, hours=9), + ), + start_date=datetime(2025, 1, 3, 18, tzinfo=UTC), + ..., + ) + def example_dag(): + pass + + .. _CronTriggerTimetable: CronTriggerTimetable @@ -71,7 +118,7 @@ CronTriggerTimetable A timetable that accepts a cron expression, and triggers DAG runs according to it. -.. seealso:: `Differences between the two cron timetables`_ +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ .. code-block:: python @@ -132,7 +179,7 @@ CronDataIntervalTimetable A timetable that accepts a cron expression, creates data intervals according to the interval between each cron trigger points, and triggers a DAG run at the end of each data interval. -.. seealso:: `Differences between the two cron timetables`_ +.. seealso:: `Differences between "trigger" and "data interval" timetables`_ .. seealso:: `Differences between the cron and delta data interval timetables`_ Select this timetable by providing a valid cron expression as a string to the ``schedule`` @@ -209,37 +256,39 @@ Here's an example of a DAG using ``DatasetOrTimeSchedule``: Timetables comparisons ---------------------- -.. _Differences between the two cron timetables: +.. _Differences between "trigger" and "data interval" timetables: + +Differences between "trigger" and "data interval" timetables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Differences between the two cron timetables -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Airflow has two sets of timetables for cron and delta schedules: -Airflow has two timetables `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ that accept a cron expression. +* CronTriggerTimetable_ and CronDataIntervalTimetable_ both accept a cron expression. +* DeltaTriggerTimetable_ and DeltaDataIntervalTimetable_ both accept a timedelta or relativedelta. -However, there are differences between the two: -- `CronTriggerTimetable`_ does not address *Data Interval*, while `CronDataIntervalTimetable`_ does. -- The timestamp in the ``run_id``, the ``logical_date`` for `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ are defined differently based on how they handle the data interval, as described in :ref:`timetables_run_id_logical_date`. +- A trigger timetable (CronTriggerTimetable_ or DeltaTriggerTimetable_) does not address the concept of *data interval*, while a "data interval" one (CronDataIntervalTimetable_ or DeltaDataIntervalTimetable_) does. +- The timestamp in the ``run_id``, the ``logical_date`` of the two timetable kinds are defined differently based on how they handle the data interval, as described in :ref:`timetables_run_id_logical_date`. Whether taking care of *Data Interval* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -`CronTriggerTimetable`_ *does not* include *data interval*. This means that the value of ``data_interval_start`` and -``data_interval_end`` (and the legacy ``execution_date``) are the same; the time when a DAG run is triggered. +A trigger timetable *does not* include *data interval*. This means that the value of ``data_interval_start`` +and ``data_interval_end`` (and the legacy ``execution_date``) are the same; the time when a DAG run is +triggered. -However, `CronDataIntervalTimetable`_ *does* include *data interval*. This means the value of -``data_interval_start`` and ``data_interval_end`` (and legacy ``execution_date``) are different. ``data_interval_start`` is the time when a -DAG run is triggered and ``data_interval_end`` is the end of the interval. +For a data interval timetable, the value of ``data_interval_start`` and ``data_interval_end`` (and legacy +``execution_date``) are different. ``data_interval_start`` is the time when a DAG run is triggered and +``data_interval_end`` is the end of the interval. *Catchup* behavior ^^^^^^^^^^^^^^^^^^ -Whether you're using `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_, there is no difference when ``catchup`` is ``True``. - You might want to use ``False`` for ``catchup`` for certain scenarios, to prevent running unnecessary DAGs: - If you create a new DAG with a start date in the past, and don't want to run DAGs for the past. If ``catchup`` is ``True``, Airflow runs all DAGs that would have run in that time interval. - If you pause an existing DAG, and then restart it at a later date, and don't want to If ``catchup`` is ``True``, -In these scenarios, the ``logical_date`` in the ``run_id`` are based on how `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_ handle the data interval. +In these scenarios, the ``logical_date`` in the ``run_id`` are based on how how the timetable handles the data +interval. See :ref:`dag-catchup` for more information about how DAG runs are triggered when using ``catchup``. @@ -248,30 +297,29 @@ See :ref:`dag-catchup` for more information about how DAG runs are triggered whe The time when a DAG run is triggered ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at the same time. However, the timestamp for the ``run_id`` is different for each. - -- `CronTriggerTimetable`_ has a ``run_id`` timestamp, the ``logical_date``, showing when DAG run is able to start. -- `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at the same time. However, the timestamp for the ``run_id`` (``logical_date``) is different for each. +Both trigger and data interval timetables trigger DAG runs at the same time. However, the timestamp for the +``run_id`` is different for each. This is because ``run_id`` is based on ``logical_date``. For example, suppose there is a cron expression ``@daily`` or ``0 0 * * *``, which is scheduled to run at 12AM every day. If you enable DAGs using the two timetables at 3PM on January 31st, -- `CronTriggerTimetable`_ triggers a new DAG run at 12AM on February 1st. The ``run_id`` timestamp is midnight, on February 1st. -- `CronDataIntervalTimetable`_ immediately triggers a new DAG run, because a DAG run for the daily time interval beginning at 12AM on January 31st did not occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is the beginning of the data interval. +- `CronTriggerTimetable`_ creates a new DAG run at 12AM on February 1st. The ``run_id`` timestamp is midnight, on February 1st. +- `CronDataIntervalTimetable`_ immediately creates a new DAG run, because a DAG run for the daily time interval beginning at 12AM on January 31st did not occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is the beginning of the data interval. -This is another example showing the difference in the case of skipping DAG runs. +The following is another example showing the difference in the case of skipping DAG runs: Suppose there are two running DAGs with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the DAGs at 3PM on January 31st and re-enable them at 3PM on February 2nd, - `CronTriggerTimetable`_ skips the DAG runs that were supposed to trigger on February 1st and 2nd. The next DAG run will be triggered at 12AM on February 3rd. - `CronDataIntervalTimetable`_ skips the DAG runs that were supposed to trigger on February 1st only. A DAG run for February 2nd is immediately triggered after you re-enable the DAG. -In these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is more intuitive and more similar to what -people expect cron to behave than how `CronDataIntervalTimetable`_ does. +In these examples, you see how a trigger timetable creates DAG runs more intuitively and similar to what +people expect a workflow to behave, while a data interval timetable is designed heavily around the data +interval it processes, and does not reflect a workflow's own properties. .. _Differences between the cron and delta data interval timetables: -Differences between the cron and delta data interval timetables: -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Differences between the cron and delta data interval timetables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Choosing between `DeltaDataIntervalTimetable`_ and `CronDataIntervalTimetable`_ depends on your use case. If you enable a DAG at 01:05 on February 1st, the following table summarizes the DAG runs created and the diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 9088320655135..3c40174d8bf0e 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -181,9 +181,28 @@ def test_validate_failure(timetable: Timetable, error_message: str) -> None: assert str(ctx.value) == error_message -def test_cron_interval_timezone_from_string(): - timetable = CronDataIntervalTimetable("@hourly", "UTC") - assert timetable.serialize()["timezone"] == "UTC" +def test_cron_interval_serialize(): + data = HOURLY_CRON_TIMETABLE.serialize() + assert data == {"expression": "0 * * * *", "timezone": "UTC"} + tt = CronDataIntervalTimetable.deserialize(data) + assert isinstance(tt, CronDataIntervalTimetable) + assert tt._expression == HOURLY_CRON_TIMETABLE._expression + assert tt._timezone == HOURLY_CRON_TIMETABLE._timezone + + +@pytest.mark.parametrize( + "timetable, expected_data", + [ + (HOURLY_RELATIVEDELTA_TIMETABLE, {"delta": {"hours": 1}}), + (HOURLY_TIMEDELTA_TIMETABLE, {"delta": 3600.0}), + ], +) +def test_delta_interval_serialize(timetable, expected_data): + data = timetable.serialize() + assert data == expected_data + tt = DeltaDataIntervalTimetable.deserialize(data) + assert isinstance(tt, DeltaDataIntervalTimetable) + assert tt._delta == timetable._delta @pytest.mark.parametrize( diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index 5165a14b3c115..a3e4b3fe6ec11 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -25,8 +25,8 @@ import time_machine from airflow.exceptions import AirflowTimetableInvalid -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction -from airflow.timetables.trigger import CronTriggerTimetable +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.timetables.trigger import CronTriggerTimetable, DeltaTriggerTimetable from airflow.utils.timezone import utc START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) @@ -39,6 +39,9 @@ HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc) +HOURLY_TIMEDELTA_TIMETABLE = DeltaTriggerTimetable(datetime.timedelta(hours=1)) +HOURLY_RELATIVEDELTA_TIMETABLE = DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(hours=1)) + DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -76,6 +79,47 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( assert next_info == DagRunInfo.exact(next_start_time) +@pytest.mark.parametrize( + "last_automated_data_interval, next_start_time", + [ + pytest.param( + None, + CURRENT_TIME, + id="first-run", + ), + pytest.param( + DataInterval.exact(YESTERDAY + DELTA_FROM_MIDNIGHT), + CURRENT_TIME + DELTA_FROM_MIDNIGHT, + id="before-now", + ), + pytest.param( + DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT), + CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT, + id="after-now", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(DeltaTriggerTimetable(datetime.timedelta(days=1)), id="timedelta"), + pytest.param(DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=1)), id="relativedelta"), + ], +) +@time_machine.travel(CURRENT_TIME) +def test_daily_delta_trigger_no_catchup_first_starts_at_next_schedule( + last_automated_data_interval: DataInterval | None, + next_start_time: pendulum.DateTime, + timetable: Timetable, +) -> None: + """If ``catchup=False`` and start_date is a day before""" + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), + ) + assert next_info == DagRunInfo.exact(next_start_time) + + @pytest.mark.parametrize( "current_time, earliest, expected", [ @@ -124,6 +168,62 @@ def test_hourly_cron_trigger_no_catchup_next_info( assert next_info == expected +@pytest.mark.parametrize( + "current_time, earliest, expected", + [ + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + id="current_time_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + id="current_time_not_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), + id="current_time_miss_one_interval_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc)), + id="current_time_miss_one_interval_not_on_boundary", + ), + pytest.param( + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc)), + id="future_start_date", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_hourly_delta_trigger_no_catchup_next_info( + current_time: pendulum.DateTime, + earliest: pendulum.DateTime, + expected: DagRunInfo, + timetable: Timetable, +) -> None: + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=PREV_DATA_INTERVAL_EXACT, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=False), + ) + assert next_info == expected + + @pytest.mark.parametrize( "last_automated_data_interval, earliest, expected", [ @@ -171,6 +271,55 @@ def test_hourly_cron_trigger_catchup_next_info( assert next_info == expected +@pytest.mark.parametrize( + "last_automated_data_interval, earliest, expected", + [ + pytest.param( + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), + id="last_automated_on_boundary", + ), + pytest.param( + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + START_DATE, + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc)), + id="last_automated_not_on_boundary", + ), + pytest.param( + None, + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), + id="no_last_automated_with_earliest_on_boundary", + ), + pytest.param( + None, + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), + id="no_last_automated_with_earliest_not_on_boundary", + ), + ], +) +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_hourly_delta_trigger_catchup_next_info( + last_automated_data_interval: DataInterval | None, + earliest: pendulum.DateTime | None, + expected: DagRunInfo | None, + timetable: Timetable, +) -> None: + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=True), + ) + assert next_info == expected + + def test_cron_trigger_next_info_with_interval(): # Runs every Monday on 16:30, covering the day before the run. timetable = CronTriggerTimetable( @@ -192,37 +341,73 @@ def test_cron_trigger_next_info_with_interval(): ) -def test_validate_success() -> None: - HOURLY_CRON_TRIGGER_TIMETABLE.validate() - +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_CRON_TRIGGER_TIMETABLE, id="cron"), + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_validate_success(timetable: Timetable) -> None: + timetable.validate() -def test_validate_failure() -> None: - timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc) +@pytest.mark.parametrize( + "timetable, message", + [ + pytest.param( + CronTriggerTimetable("0 0 1 13 0", timezone=utc), + "[0 0 1 13 0] is not acceptable, out of range", + id="cron", + ), + pytest.param( + DeltaTriggerTimetable(datetime.timedelta(days=-1)), + "schedule interval must be positive, not datetime.timedelta(days=-1)", + id="timedelta", + ), + pytest.param( + DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=-1)), + "schedule interval must be positive, not relativedelta(days=-1)", + id="relativedelta", + ), + ], +) +def test_validate_failure(timetable: Timetable, message: str) -> None: with pytest.raises(AirflowTimetableInvalid) as ctx: timetable.validate() - assert str(ctx.value) == "[0 0 1 13 0] is not acceptable, out of range" + assert str(ctx.value) == message @pytest.mark.parametrize( "timetable, data", [ - (HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", "timezone": "UTC", "interval": 0}), - ( + pytest.param( + HOURLY_CRON_TRIGGER_TIMETABLE, + {"expression": "0 * * * *", "timezone": "UTC", "interval": 0.0}, + id="hourly", + ), + pytest.param( CronTriggerTimetable("0 0 1 12 *", timezone=utc, interval=datetime.timedelta(hours=2)), {"expression": "0 0 1 12 *", "timezone": "UTC", "interval": 7200.0}, + id="interval", ), - ( + pytest.param( CronTriggerTimetable( "0 0 1 12 0", timezone="Asia/Taipei", interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), ), - {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}}, + { + "expression": "0 0 1 12 0", + "timezone": "Asia/Taipei", + "interval": {"weekday": [0]}, + }, + id="non-utc-interval", ), ], ) -def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.Any]) -> None: +def test_cron_trigger_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.Any]) -> None: assert timetable.serialize() == data tt = CronTriggerTimetable.deserialize(data) @@ -230,3 +415,43 @@ def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.A assert tt._expression == timetable._expression assert tt._timezone == timetable._timezone assert tt._interval == timetable._interval + + +@pytest.mark.parametrize( + "timetable, data", + [ + pytest.param( + HOURLY_TIMEDELTA_TIMETABLE, + {"delta": 3600.0, "interval": 0.0}, + id="timedelta", + ), + pytest.param( + DeltaTriggerTimetable( + datetime.timedelta(hours=3), + interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), + ), + {"delta": 10800.0, "interval": {"weekday": [0]}}, + id="timedelta-interval", + ), + pytest.param( + HOURLY_RELATIVEDELTA_TIMETABLE, + {"delta": {"hours": 1}, "interval": 0.0}, + id="relativedelta", + ), + pytest.param( + DeltaTriggerTimetable( + dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), + interval=datetime.timedelta(days=7), + ), + {"delta": {"weekday": [0]}, "interval": 604800.0}, + id="relativedelta-interval", + ), + ], +) +def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: dict[str, typing.Any]) -> None: + assert timetable.serialize() == data + + tt = DeltaTriggerTimetable.deserialize(data) + assert isinstance(tt, DeltaTriggerTimetable) + assert tt._delta == timetable._delta + assert tt._interval == timetable._interval