diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 077c4195a7ae0..6850268de3496 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -228,6 +228,27 @@ def _align_to_next(self, current: DateTime) -> DateTime: def _align_to_prev(self, current: DateTime) -> DateTime: return current + @staticmethod + def _relativedelta_in_seconds(delta: relativedelta) -> int: + return ( + delta.years * 365 * 24 * 60 * 60 + + delta.months * 30 * 24 * 60 * 60 + + delta.days * 24 * 60 * 60 + + delta.hours * 60 * 60 + + delta.minutes * 60 + + delta.seconds + ) + + def _round(self, dt: DateTime) -> DateTime: + """Round the given time to the nearest interval.""" + if isinstance(self._delta, datetime.timedelta): + delta_in_seconds = self._delta.total_seconds() + else: + delta_in_seconds = self._relativedelta_in_seconds(self._delta) + dt_in_seconds = dt.timestamp() + rounded_dt = dt_in_seconds - (dt_in_seconds % delta_in_seconds) + return DateTime.fromtimestamp(rounded_dt, tz=dt.tzinfo) + def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: """Bound the earliest time a run can be scheduled. @@ -236,7 +257,8 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime: This is slightly different from the cron version at terminal values. """ - new_start = self._get_prev(DateTime.utcnow()) + round_current_time = self._round(DateTime.utcnow()) + new_start = self._get_prev(round_current_time) if earliest is None: return new_start return max(new_start, earliest) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9fa0eb669d11b..f4b872109f626 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2249,8 +2249,9 @@ def make_dag(dag_id, schedule, start_date, catchup): # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date == six_hours_ago_to_the_hour - @time_machine.travel(timezone.datetime(2020, 1, 5), tick=False) - def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self): + @time_machine.travel(timezone.datetime(2020, 1, 5)) + @pytest.mark.parametrize("schedule", ("@daily", timedelta(days=1), cron_timetable("0 0 * * *"))) + def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self, schedule): """ Test that the dag file processor does not create multiple dagruns if a dag is scheduled with 'timedelta' and catchup=False @@ -2258,7 +2259,7 @@ def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self): dag = DAG( "test_scheduler_dagrun_once_with_timedelta_and_catchup_false", start_date=timezone.datetime(2015, 1, 1), - schedule=timedelta(days=1), + schedule=schedule, catchup=False, )