Skip to content

Commit

Permalink
Fix DagRun data interval for DeltaDataIntervalTimetable (apache#35391)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored and romsharon98 committed Nov 10, 2023
1 parent d10e2c3 commit 122f1ea
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
24 changes: 23 additions & 1 deletion airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,27 @@ def _align_to_next(self, current: DateTime) -> DateTime:
def _align_to_prev(self, current: DateTime) -> DateTime:
return current

@staticmethod
def _relativedelta_in_seconds(delta: relativedelta) -> int:
return (
delta.years * 365 * 24 * 60 * 60
+ delta.months * 30 * 24 * 60 * 60
+ delta.days * 24 * 60 * 60
+ delta.hours * 60 * 60
+ delta.minutes * 60
+ delta.seconds
)

def _round(self, dt: DateTime) -> DateTime:
"""Round the given time to the nearest interval."""
if isinstance(self._delta, datetime.timedelta):
delta_in_seconds = self._delta.total_seconds()
else:
delta_in_seconds = self._relativedelta_in_seconds(self._delta)
dt_in_seconds = dt.timestamp()
rounded_dt = dt_in_seconds - (dt_in_seconds % delta_in_seconds)
return DateTime.fromtimestamp(rounded_dt, tz=dt.tzinfo)

def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
"""Bound the earliest time a run can be scheduled.
Expand All @@ -236,7 +257,8 @@ def _skip_to_latest(self, earliest: DateTime | None) -> DateTime:
This is slightly different from the cron version at terminal values.
"""
new_start = self._get_prev(DateTime.utcnow())
round_current_time = self._round(DateTime.utcnow())
new_start = self._get_prev(round_current_time)
if earliest is None:
return new_start
return max(new_start, earliest)
Expand Down
7 changes: 4 additions & 3 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2249,16 +2249,17 @@ def make_dag(dag_id, schedule, start_date, catchup):
# The DR should be scheduled in the last 2 hours, not 6 hours ago
assert next_date == six_hours_ago_to_the_hour

@time_machine.travel(timezone.datetime(2020, 1, 5), tick=False)
def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self):
@time_machine.travel(timezone.datetime(2020, 1, 5))
@pytest.mark.parametrize("schedule", ("@daily", timedelta(days=1), cron_timetable("0 0 * * *")))
def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self, schedule):
"""
Test that the dag file processor does not create multiple dagruns
if a dag is scheduled with 'timedelta' and catchup=False
"""
dag = DAG(
"test_scheduler_dagrun_once_with_timedelta_and_catchup_false",
start_date=timezone.datetime(2015, 1, 1),
schedule=timedelta(days=1),
schedule=schedule,
catchup=False,
)

Expand Down

0 comments on commit 122f1ea

Please sign in to comment.