Skip to content

Commit

Permalink
Merge branch 'main' into fix-otel-duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
InderParmar authored Oct 25, 2024
2 parents 15fd4e1 + f802330 commit 3d72042
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.10.2"
ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.26
ARG AIRFLOW_UV_VERSION=0.4.27
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Here we fix the versions so all subsequent commands will use the versions
# from the sources
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.26
ARG AIRFLOW_UV_VERSION=0.4.27

ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
AIRFLOW_UV_VERSION=${AIRFLOW_UV_VERSION}
Expand Down
81 changes: 76 additions & 5 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone
from airflow.utils.timezone import coerce_datetime, utcnow

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
Expand All @@ -43,6 +43,24 @@ class CronTriggerTimetable(CronMixin, Timetable):
for one data interval to pass.
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
:param cron: cron string that defines when to run
:param timezone: Which timezone to use to interpret the cron string
:param interval: timedelta that defines the data interval start. Default 0.
*run_immediately* controls, if no *start_time* is given to the DAG, when
the first run of the DAG should be scheduled. It has no effect if there
already exist runs for this DAG.
* If *True*, always run immediately the most recent possible DAG run.
* If *False*, wait to run until the next scheduled time in the future.
* If passed a ``timedelta``, will run the most recent possible DAG run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
"""

def __init__(
Expand All @@ -51,9 +69,11 @@ def __init__(
*,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
run_immediately: bool | datetime.timedelta = False,
) -> None:
super().__init__(cron, timezone)
self._interval = interval
self.run_immediately = run_immediately

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
Expand All @@ -64,7 +84,21 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable:
interval = decode_relativedelta(data["interval"])
else:
interval = datetime.timedelta(seconds=data["interval"])
return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)

immediate: bool | datetime.timedelta
if "immediate" not in data:
immediate = False
elif isinstance(data["immediate"], float):
immediate = datetime.timedelta(seconds=data["interval"])
else:
immediate = data["immediate"]

return cls(
data["expression"],
timezone=decode_timezone(data["timezone"]),
interval=interval,
run_immediately=immediate,
)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone
Expand All @@ -75,7 +109,17 @@ def serialize(self) -> dict[str, Any]:
else:
interval = encode_relativedelta(self._interval)
timezone = encode_timezone(self._timezone)
return {"expression": self._expression, "timezone": timezone, "interval": interval}
immediate: bool | float
if isinstance(self.run_immediately, datetime.timedelta):
immediate = self.run_immediately.total_seconds()
else:
immediate = self.run_immediately
return {
"expression": self._expression,
"timezone": timezone,
"interval": interval,
"run_immediately": immediate,
}

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(
Expand All @@ -95,13 +139,16 @@ def next_dagrun_info(
if last_automated_data_interval is not None:
next_start_time = self._get_next(last_automated_data_interval.end)
elif restriction.earliest is None:
return None # Don't know where to catch up from, give up.
next_start_time = self._calc_first_run()
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
start_time_candidates.append(self._calc_first_run())
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)
Expand All @@ -113,3 +160,27 @@ def next_dagrun_info(
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)

def _calc_first_run(self):
"""
If no start_time is set, determine the start.
If True, always prefer past run, if False, never. If None, if within 10% of next run,
if timedelta, if within that timedelta from past run.
"""
now = coerce_datetime(utcnow())
past_run_time = self._align_to_prev(now)
next_run_time = self._align_to_next(now)
if self.run_immediately is True: # not truthy, actually set to True
return past_run_time

gap_between_runs = next_run_time - past_run_time
gap_to_past = now - past_run_time
if isinstance(self.run_immediately, datetime.timedelta):
buffer_between_runs = self.run_immediately
else:
buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5))
if gap_to_past <= buffer_between_runs:
return past_run_time
else:
return next_run_time
5 changes: 2 additions & 3 deletions contributing-docs/11_provider_packages.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,8 @@ backward compatible with future versions of Airflow, so you can upgrade Airflow
at the same version.

When you introduce a breaking change in the provider, you have to make sure that you communicate it
properly. You have to update ``CHANGELOG.rst`` file in the provider package and you have to make sure that
you update the ``provider.yaml`` file with the new (breaking) version of the provider. Ideally in the
``CHANGELOG.rst`` you should also provide a migration path for the users to follow.
properly. You have to update ``CHANGELOG.rst`` file in the provider package. Ideally you should provide
a migration path for the users to follow in the``CHANGELOG.rst``.

If in doubt, you can always look at ``CHANGELOG.rst`` in other providers to see how we communicate
breaking changes in the providers.
Expand Down
2 changes: 1 addition & 1 deletion dev/breeze/doc/ci/02_images.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ can be used for CI images:
| `ADDITIONAL_DEV_APT_DEPS` | | Additional apt dev dependencies installed in the first part of the image |
| `ADDITIONAL_DEV_APT_ENV` | | Additional env variables defined when installing dev deps |
| `AIRFLOW_PIP_VERSION` | `24.2` | PIP version used. |
| `AIRFLOW_UV_VERSION` | `0.4.26` | UV version used. |
| `AIRFLOW_UV_VERSION` | `0.4.27` | UV version used. |
| `AIRFLOW_USE_UV` | `true` | Whether to use UV for installation. |
| `PIP_PROGRESS_BAR` | `on` | Progress bar for PIP installation |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class VersionedFile(NamedTuple):


AIRFLOW_PIP_VERSION = "24.2"
AIRFLOW_UV_VERSION = "0.4.26"
AIRFLOW_UV_VERSION = "0.4.27"
AIRFLOW_USE_UV = False
WHEEL_VERSION = "0.36.2"
GITPYTHON_VERSION = "3.1.40"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ def create_batch(
individual attempt.
:param metadata: Additional metadata that is provided to the method.
"""
self.log.debug("Creating batch: %s", batch)
client = self.get_batch_client(region)
parent = f"projects/{project_id}/regions/{region}"

Expand Down
79 changes: 67 additions & 12 deletions tests/timetables/test_trigger_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=utc)
YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)

HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc)
HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc, run_immediately=True)

DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)

Expand Down Expand Up @@ -68,7 +68,11 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
next_start_time: pendulum.DateTime,
) -> None:
"""If ``catchup=False`` and start_date is a day before"""
timetable = CronTriggerTimetable("30 16 * * *", timezone=utc)
timetable = CronTriggerTimetable(
"30 16 * * *",
timezone=utc,
run_immediately=False, # Should have no effect since earliest is not None
)
next_info = timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
Expand Down Expand Up @@ -151,12 +155,6 @@ def test_hourly_cron_trigger_no_catchup_next_info(
DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)),
id="no_last_automated_with_earliest_not_on_boundary",
),
pytest.param(
None,
None,
None,
id="no_last_automated_no_earliest",
),
],
)
def test_hourly_cron_trigger_catchup_next_info(
Expand Down Expand Up @@ -207,18 +205,29 @@ def test_validate_failure() -> None:
@pytest.mark.parametrize(
"timetable, data",
[
(HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", "timezone": "UTC", "interval": 0}),
(
CronTriggerTimetable("0 0 1 12 *", timezone=utc, interval=datetime.timedelta(hours=2)),
{"expression": "0 0 1 12 *", "timezone": "UTC", "interval": 7200.0},
HOURLY_CRON_TRIGGER_TIMETABLE,
{"expression": "0 * * * *", "run_immediately": True, "timezone": "UTC", "interval": 0.0},
),
(
CronTriggerTimetable(
"0 0 1 12 *", timezone=utc, run_immediately=False, interval=datetime.timedelta(hours=2)
),
{"expression": "0 0 1 12 *", "run_immediately": False, "timezone": "UTC", "interval": 7200.0},
),
(
CronTriggerTimetable(
"0 0 1 12 0",
timezone="Asia/Taipei",
run_immediately=False,
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
),
{"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}},
{
"expression": "0 0 1 12 0",
"run_immediately": False,
"timezone": "Asia/Taipei",
"interval": {"weekday": [0]},
},
),
],
)
Expand All @@ -230,3 +239,49 @@ def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.A
assert tt._expression == timetable._expression
assert tt._timezone == timetable._timezone
assert tt._interval == timetable._interval


JUST_AFTER = pendulum.datetime(year=2024, month=8, day=15, hour=3, minute=5)
WAY_AFTER = pendulum.datetime(year=2024, month=8, day=15, hour=12, minute=5)
PREVIOUS = DagRunInfo.exact(pendulum.datetime(year=2024, month=8, day=15, hour=3))
NEXT = DagRunInfo.exact(pendulum.datetime(year=2024, month=8, day=16, hour=3))


@pytest.mark.parametrize("catchup", [True, False])
@pytest.mark.parametrize(
"run_immediately, current_time, correct_interval",
[
(True, WAY_AFTER, PREVIOUS),
(False, JUST_AFTER, PREVIOUS),
(False, WAY_AFTER, NEXT),
(datetime.timedelta(minutes=10), JUST_AFTER, PREVIOUS),
(datetime.timedelta(minutes=10), WAY_AFTER, NEXT),
],
)
def test_run_immediately(catchup, run_immediately, current_time, correct_interval):
timetable = CronTriggerTimetable(
"0 3 * * *",
timezone=utc,
run_immediately=run_immediately,
)
with time_machine.travel(current_time):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup),
)
assert next_info == correct_interval


@pytest.mark.parametrize("catchup", [True, False])
def test_run_immediately_fast_dag(catchup):
timetable = CronTriggerTimetable(
"*/10 3 * * *", # Runs every 10 minutes, so falls back to 5 min hardcoded limit on buffer time
timezone=utc,
run_immediately=False,
)
with time_machine.travel(JUST_AFTER, tick=False):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup),
)
assert next_info == PREVIOUS

0 comments on commit 3d72042

Please sign in to comment.