Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2661,7 +2661,26 @@ scheduler:
type: boolean
example: ~
default: "True"
see_also: ":ref:`Differences between the two cron timetables`"
see_also: ':ref:`Differences between "trigger" and "data interval" timetables`'
create_delta_data_intervals:
description: |
Whether to create DAG runs that span an interval or one single point in time when a timedelta or
relativedelta is provided to ``schedule`` argument of a DAG.

* ``True``: **DeltaDataIntervalTimetable** is used, which is suitable for DAGs with well-defined data
interval. You get contiguous intervals from the end of the previous interval up to the scheduled
datetime.
* ``False``: **DeltaTriggerTimetable** is used, which is suitable for DAGs that simply want to say
e.g. "run this every day" and do not care about the data interval.

Notably, for **DeltaTriggerTimetable**, the logical date is the same as the time the DAG Run will
try to schedule, while for **DeltaDataIntervalTimetable**, the logical date is the beginning of
the data interval, but the DAG Run will try to schedule at the end of the data interval.
version_added: 2.11.0
type: boolean
example: ~
default: "True"
see_also: ':ref:`Differences between "trigger" and "data interval" timetables`'
triggerer:
description: ~
options:
Expand Down
56 changes: 56 additions & 0 deletions airflow/timetables/_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.timezone import convert_to_utc

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum import DateTime


class DeltaMixin:
"""Mixin to provide interface to work with timedelta and relativedelta."""

def __init__(self, delta: datetime.timedelta | relativedelta) -> None:
self._delta = delta

@property
def summary(self) -> str:
return str(self._delta)

def validate(self) -> None:
now = datetime.datetime.now()
if (now + self._delta) <= now:
raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")

def _get_next(self, current: DateTime) -> DateTime:
return convert_to_utc(current + self._delta)

def _get_prev(self, current: DateTime) -> DateTime:
return convert_to_utc(current - self._delta)

def _align_to_next(self, current: DateTime) -> DateTime:
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
return current
30 changes: 3 additions & 27 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from dateutil.relativedelta import relativedelta
from pendulum import DateTime

from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
from airflow.utils.timezone import coerce_datetime, utcnow

if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
Expand Down Expand Up @@ -173,7 +173,7 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(start=self._get_prev(end), end=end)


class DeltaDataIntervalTimetable(_DataIntervalTimetable):
class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable):
"""
Timetable that schedules data intervals with a time delta.

Expand All @@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
instance.
"""

def __init__(self, delta: Delta) -> None:
self._delta = delta

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.serialized_objects import decode_relativedelta
Expand All @@ -204,10 +201,6 @@ def __eq__(self, other: Any) -> bool:
return NotImplemented
return self._delta == other._delta

@property
def summary(self) -> str:
return str(self._delta)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_relativedelta

Expand All @@ -218,23 +211,6 @@ def serialize(self) -> dict[str, Any]:
delta = encode_relativedelta(self._delta)
return {"delta": delta}

def validate(self) -> None:
now = datetime.datetime.now()
if (now + self._delta) <= now:
raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")

def _get_next(self, current: DateTime) -> DateTime:
return convert_to_utc(current + self._delta)

def _get_prev(self, current: DateTime) -> DateTime:
return convert_to_utc(current - self._delta)

def _align_to_next(self, current: DateTime) -> DateTime:
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
return current

@staticmethod
def _relativedelta_in_seconds(delta: relativedelta) -> int:
return (
Expand Down
154 changes: 110 additions & 44 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from typing import TYPE_CHECKING, Any

from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone
from airflow.utils.timezone import coerce_datetime, utcnow

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
Expand All @@ -31,60 +32,43 @@
from airflow.timetables.base import TimeRestriction


class CronTriggerTimetable(CronMixin, Timetable):
"""
Timetable that triggers DAG runs according to a cron expression.

This is different from ``CronDataIntervalTimetable``, where the cron
expression specifies the *data interval* of a DAG run. With this timetable,
the data intervals are specified independently from the cron expression.
Also for the same reason, this timetable kicks off a DAG run immediately at
the start of the period (similar to POSIX cron), instead of needing to wait
for one data interval to pass.
def _serialize_interval(interval: datetime.timedelta | relativedelta) -> float | dict:
from airflow.serialization.serialized_objects import encode_relativedelta

Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""
if isinstance(interval, datetime.timedelta):
return interval.total_seconds()
return encode_relativedelta(interval)

def __init__(
self,
cron: str,
*,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(cron, timezone)
self._interval = interval

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
def _deserialize_interval(value: int | dict) -> datetime.timedelta | relativedelta:
from airflow.serialization.serialized_objects import decode_relativedelta

interval: datetime.timedelta | relativedelta
if isinstance(data["interval"], dict):
interval = decode_relativedelta(data["interval"])
else:
interval = datetime.timedelta(seconds=data["interval"])
return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
if isinstance(value, dict):
return decode_relativedelta(value)
return datetime.timedelta(seconds=value)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone

interval: float | dict[str, Any]
if isinstance(self._interval, datetime.timedelta):
interval = self._interval.total_seconds()
else:
interval = encode_relativedelta(self._interval)
timezone = encode_timezone(self._timezone)
return {"expression": self._expression, "timezone": timezone, "interval": interval}
class _TriggerTimetable(Timetable):
_interval: datetime.timedelta | relativedelta

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
run_after - self._interval, # type: ignore[arg-type]
coerce_datetime(run_after - self._interval),
run_after,
)

def _align_to_next(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _align_to_prev(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _get_next(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _get_prev(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def next_dagrun_info(
self,
*,
Expand All @@ -99,7 +83,7 @@ def next_dagrun_info(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
Expand All @@ -113,3 +97,85 @@ def next_dagrun_info(
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)


class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable):
"""
Timetable that triggers DAG runs according to a cron expression.

This is different from ``DeltaDataIntervalTimetable``, where the delta value
specifies the *data interval* of a DAG run. With this timetable, the data
intervals are specified independently. Also for the same reason, this
timetable kicks off a DAG run immediately at the start of the period,
instead of needing to wait for one data interval to pass.

:param delta: How much time to wait between each run.
:param interval: The data interval of each run. Default is 0.
"""

def __init__(
self,
delta: datetime.timedelta | relativedelta,
*,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(delta)
self._interval = interval

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
return cls(
_deserialize_interval(data["delta"]),
interval=_deserialize_interval(data["interval"]),
)

def serialize(self) -> dict[str, Any]:
return {
"delta": _serialize_interval(self._delta),
"interval": _serialize_interval(self._interval),
}


class CronTriggerTimetable(CronMixin, _TriggerTimetable):
"""
Timetable that triggers DAG runs according to a cron expression.

This is different from ``CronDataIntervalTimetable``, where the cron
expression specifies the *data interval* of a DAG run. With this timetable,
the data intervals are specified independently from the cron expression.
Also for the same reason, this timetable kicks off a DAG run immediately at
the start of the period (similar to POSIX cron), instead of needing to wait
for one data interval to pass.

Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""

def __init__(
self,
cron: str,
*,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(cron, timezone)
self._interval = interval

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone

interval: datetime.timedelta | relativedelta
if isinstance(data["interval"], dict):
interval = decode_relativedelta(data["interval"])
else:
interval = datetime.timedelta(seconds=data["interval"])
return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_timezone

return {
"expression": self._expression,
"timezone": encode_timezone(self._timezone),
"interval": _serialize_interval(self._interval),
}
Loading