diff --git a/tests/timetables/test_events_timetable.py b/tests/timetables/test_events_timetable.py index eb28143de28ca..e743000f07360 100644 --- a/tests/timetables/test_events_timetable.py +++ b/tests/timetables/test_events_timetable.py @@ -20,31 +20,31 @@ import pendulum import pytest -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.events import EventsTimetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # Precedes all events +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events EVENT_DATES = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), # deliberate duplicate, should be ignored - pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE), # deliberately out of order - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), # deliberate duplicate, should be ignored + pendulum.DateTime(2021, 10, 9, tzinfo=utc), # deliberately out of order + pendulum.DateTime(2021, 9, 10, tzinfo=utc), ] EVENT_DATES_SORTED = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 10, tzinfo=utc), + pendulum.DateTime(2021, 10, 9, tzinfo=utc), ] -NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=TIMEZONE) -MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE) +NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=utc) +MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=utc) @pytest.fixture() diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 928ae83ab3d02..0cd10c3a42362 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -25,26 +25,26 @@ import time_machine from airflow.exceptions import AirflowTimetableInvalid -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) PREV_DATA_INTERVAL_START = START_DATE PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END) PREV_DATA_INTERVAL_EXACT = DataInterval.exact(PREV_DATA_INTERVAL_END) -CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=utc) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) OLD_INTERVAL = DataInterval(start=YESTERDAY, end=CURRENT_TIME) -HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) +HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", utc) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1)) -CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE) +CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", utc) DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -148,7 +148,7 @@ def test_validate_success(timetable: Timetable) -> None: "timetable, error_message", [ pytest.param( - CronDataIntervalTimetable("0 0 1 13 0", TIMEZONE), + CronDataIntervalTimetable("0 0 1 13 0", utc), "[0 0 1 13 0] is not acceptable, out of range", id="invalid-cron", ), @@ -191,19 +191,19 @@ def test_cron_interval_timezone_from_string(): [ # Arbitrary trigger time. pytest.param( - pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=utc), DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), id="adhoc", ), # Trigger time falls exactly on interval boundary. pytest.param( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), id="exact", ), @@ -213,7 +213,7 @@ def test_cron_infer_manual_data_interval_alignment( trigger_at: pendulum.DateTime, expected_interval: DataInterval, ) -> None: - timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + timetable = CronDataIntervalTimetable("@daily", utc) assert timetable.infer_manual_data_interval(run_after=trigger_at) == expected_interval @@ -222,12 +222,12 @@ def test_cron_infer_manual_data_interval_alignment( [ pytest.param( DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 9, tzinfo=utc), ), id="exact", ), @@ -235,19 +235,19 @@ def test_cron_infer_manual_data_interval_alignment( # Previous data interval does not align with the current timetable. # This is possible if the user edits a DAG with existing runs. DataInterval( - pendulum.DateTime(2022, 8, 7, 1, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, 1, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=utc), ), DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 9, tzinfo=utc), ), id="changed", ), ], ) def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expected_info: DagRunInfo): - timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + timetable = CronDataIntervalTimetable("@daily", utc) info = timetable.next_dagrun_info( last_automated_data_interval=last_data_interval, restriction=TimeRestriction(None, None, True), @@ -269,7 +269,7 @@ class TestCronIntervalDst: def test_entering_exact(self) -> None: timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 3, 24, tz=utc), latest=None, catchup=True, ) @@ -277,8 +277,8 @@ def test_entering_exact(self) -> None: # Last run before DST. Interval starts and ends on 2am UTC (local time is +1). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 24, 2, tz=TIMEZONE), - pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), + pendulum.datetime(2023, 3, 24, 2, tz=utc), + pendulum.datetime(2023, 3, 25, 2, tz=utc), ) # Crossing the DST switch. Interval starts on 2am UTC (local time +1) @@ -288,8 +288,8 @@ def test_entering_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 2, tz=utc), + pendulum.datetime(2023, 3, 26, 1, tz=utc), ) # In DST. Interval starts and ends on 1am UTC (local time is +2). @@ -298,14 +298,14 @@ def test_entering_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 27, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=utc), + pendulum.datetime(2023, 3, 27, 1, tz=utc), ) def test_entering_skip(self) -> None: timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 3, 24, tz=utc), latest=None, catchup=True, ) @@ -313,8 +313,8 @@ def test_entering_skip(self) -> None: # Last run before DST. Interval starts and ends on 1am UTC (local time is +1). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 24, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 24, 1, tz=utc), + pendulum.datetime(2023, 3, 25, 1, tz=utc), ) # Crossing the DST switch. Interval starts on 1am UTC (local time +1) @@ -325,8 +325,8 @@ def test_entering_skip(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 1, tz=utc), + pendulum.datetime(2023, 3, 26, 1, tz=utc), ) # In DST. Interval starts on 1am UTC (local time is +2 but 2am local @@ -336,14 +336,14 @@ def test_entering_skip(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 27, 0, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=utc), + pendulum.datetime(2023, 3, 27, 0, tz=utc), ) def test_exiting_exact(self) -> None: timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 10, 27, tz=utc), latest=None, catchup=True, ) @@ -351,8 +351,8 @@ def test_exiting_exact(self) -> None: # Last run in DST. Interval starts and ends on 1am UTC (local time is +2). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 27, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 27, 1, tz=utc), + pendulum.datetime(2023, 10, 28, 1, tz=utc), ) # Crossing the DST switch. Interval starts on 1am UTC (local time +2) @@ -362,8 +362,8 @@ def test_exiting_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 1, tz=utc), + pendulum.datetime(2023, 10, 29, 2, tz=utc), ) # Out of DST. Interval starts and ends on 2am UTC (local time is +1). @@ -372,14 +372,14 @@ def test_exiting_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), - pendulum.datetime(2023, 10, 30, 2, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 2, tz=utc), + pendulum.datetime(2023, 10, 30, 2, tz=utc), ) def test_exiting_fold(self) -> None: timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 10, 27, tz=utc), latest=None, catchup=True, ) @@ -388,8 +388,8 @@ def test_exiting_fold(self) -> None: # time is +2). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 27, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 27, 0, tz=utc), + pendulum.datetime(2023, 10, 28, 0, tz=utc), ) # Account for folding. Interval starts on 0am UTC (local time +2) and @@ -402,8 +402,8 @@ def test_exiting_fold(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 1, tz=utc), ) # Stepping out of DST. Interval starts from the folded 2am local time @@ -414,8 +414,8 @@ def test_exiting_fold(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 30, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 1, tz=utc), + pendulum.datetime(2023, 10, 30, 1, tz=utc), ) @@ -429,7 +429,7 @@ class TestCronIntervalDstNonTrivial: def test_7_to_8_entering(self): timetable = CronDataIntervalTimetable("0 7-8 * * *", timezone="America/Los_Angeles") restriction = TimeRestriction( - earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + earliest=pendulum.datetime(2020, 3, 7, tz=utc), latest=None, catchup=True, ) @@ -440,8 +440,8 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 7 + 8, tz=utc), + pendulum.datetime(2020, 3, 7, 8 + 8, tz=utc), ) # This interval ends an hour early since it includes the DST switch! @@ -450,8 +450,8 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 8 + 8, tz=utc), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), ) # We're fully into DST so the interval is as expected. @@ -460,14 +460,14 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 8 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), + pendulum.datetime(2020, 3, 8, 8 + 7, tz=utc), ) def test_7_and_9_entering(self): timetable = CronDataIntervalTimetable("0 7,9 * * *", timezone="America/Los_Angeles") restriction = TimeRestriction( - earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + earliest=pendulum.datetime(2020, 3, 7, tz=utc), latest=None, catchup=True, ) @@ -478,8 +478,8 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 7 + 8, tz=utc), + pendulum.datetime(2020, 3, 7, 9 + 8, tz=utc), ) # This interval ends an hour early since it includes the DST switch! @@ -488,8 +488,8 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 9 + 8, tz=utc), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), ) # We're fully into DST so the interval is as expected. @@ -498,15 +498,15 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 9 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), + pendulum.datetime(2020, 3, 8, 9 + 7, tz=utc), ) def test_fold_scheduling(): timetable = CronDataIntervalTimetable("*/30 * * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), # Locally 1:30 (DST). + earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=utc), # Locally 1:30 (DST). latest=None, catchup=True, ) @@ -517,16 +517,16 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), # Locally 2am (DST). + pendulum.datetime(2023, 10, 28, 23, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 0, 0, tz=utc), # Locally 2am (DST). ) next_info = timetable.next_dagrun_info( last_automated_data_interval=next_info.data_interval, restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), # Locally 2:30 (DST). + pendulum.datetime(2023, 10, 29, 0, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 0, 30, tz=utc), # Locally 2:30 (DST). ) # Crossing into fold. @@ -535,8 +535,8 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), # Locally 2am (fold, not DST). + pendulum.datetime(2023, 10, 29, 0, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 1, 0, tz=utc), # Locally 2am (fold, not DST). ) # In the "fold zone". @@ -545,8 +545,8 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), # Locally 2am (fold, not DST). + pendulum.datetime(2023, 10, 29, 1, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 1, 30, tz=utc), # Locally 2am (fold, not DST). ) # Stepping out of fold. @@ -555,6 +555,6 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 2, 0, tz=TIMEZONE), # Locally 3am (not DST). + pendulum.datetime(2023, 10, 29, 1, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 2, 0, tz=utc), # Locally 3am (not DST). ) diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index 58e1f49df9058..5165a14b3c115 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -21,24 +21,23 @@ import dateutil.relativedelta import pendulum -import pendulum.tz import pytest import time_machine from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction from airflow.timetables.trigger import CronTriggerTimetable +from airflow.utils.timezone import utc -TIMEZONE = pendulum.tz.timezone("UTC") -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) PREV_DATA_INTERVAL_EXACT = DataInterval.exact(PREV_DATA_INTERVAL_END) -CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=utc) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) -HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=TIMEZONE) +HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc) DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -69,7 +68,7 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( next_start_time: pendulum.DateTime, ) -> None: """If ``catchup=False`` and start_date is a day before""" - timetable = CronTriggerTimetable("30 16 * * *", timezone=TIMEZONE) + timetable = CronTriggerTimetable("30 16 * * *", timezone=utc) next_info = timetable.next_dagrun_info( last_automated_data_interval=last_automated_data_interval, restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), @@ -81,33 +80,33 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( "current_time, earliest, expected", [ pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="current_time_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="current_time_not_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="current_time_miss_one_interval_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="current_time_miss_one_interval_not_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), - pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 23, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 23, 0, 0, tzinfo=utc)), id="future_start_date", ), ], @@ -129,27 +128,27 @@ def test_hourly_cron_trigger_no_catchup_next_info( "last_automated_data_interval, earliest, expected", [ pytest.param( - DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="last_automated_on_boundary", ), pytest.param( - DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE)), + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="last_automated_not_on_boundary", ), pytest.param( None, - pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="no_last_automated_with_earliest_on_boundary", ), pytest.param( None, - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="no_last_automated_with_earliest_not_on_boundary", ), pytest.param( @@ -176,20 +175,20 @@ def test_cron_trigger_next_info_with_interval(): # Runs every Monday on 16:30, covering the day before the run. timetable = CronTriggerTimetable( "30 16 * * MON", - timezone=TIMEZONE, + timezone=utc, interval=datetime.timedelta(hours=16, minutes=30), ) next_info = timetable.next_dagrun_info( last_automated_data_interval=DataInterval( - pendulum.DateTime(2022, 8, 1, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 1, 16, 30, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 1, tzinfo=utc), + pendulum.DateTime(2022, 8, 1, 16, 30, tzinfo=utc), ), restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=True), ) assert next_info == DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, 16, 30, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, 16, 30, tzinfo=utc), ) @@ -198,7 +197,7 @@ def test_validate_success() -> None: def test_validate_failure() -> None: - timetable = CronTriggerTimetable("0 0 1 13 0", timezone=TIMEZONE) + timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc) with pytest.raises(AirflowTimetableInvalid) as ctx: timetable.validate() @@ -210,13 +209,13 @@ def test_validate_failure() -> None: [ (HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", "timezone": "UTC", "interval": 0}), ( - CronTriggerTimetable("0 0 1 12 *", timezone=TIMEZONE, interval=datetime.timedelta(hours=2)), + CronTriggerTimetable("0 0 1 12 *", timezone=utc, interval=datetime.timedelta(hours=2)), {"expression": "0 0 1 12 *", "timezone": "UTC", "interval": 7200.0}, ), ( CronTriggerTimetable( "0 0 1 12 0", - timezone=pendulum.tz.timezone("Asia/Taipei"), + timezone="Asia/Taipei", interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), ), {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}}, diff --git a/tests/timetables/test_workday_timetable.py b/tests/timetables/test_workday_timetable.py index d20f45d44cf8a..41861e3d2781a 100644 --- a/tests/timetables/test_workday_timetable.py +++ b/tests/timetables/test_workday_timetable.py @@ -23,23 +23,23 @@ import pytest from airflow.example_dags.plugins.workday import AfterWorkdayTimetable -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # This is a Saturday. +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # This is a Saturday. WEEK_1_WEEKDAYS = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), # This is a US holiday - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 9, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), # This is a US holiday + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 9, tzinfo=utc), + pendulum.DateTime(2021, 9, 10, tzinfo=utc), ] -WEEK_1_SATURDAY = pendulum.DateTime(2021, 9, 11, tzinfo=TIMEZONE) +WEEK_1_SATURDAY = pendulum.DateTime(2021, 9, 11, tzinfo=utc) -WEEK_2_MONDAY = pendulum.DateTime(2021, 9, 13, tzinfo=TIMEZONE) -WEEK_2_TUESDAY = pendulum.DateTime(2021, 9, 14, tzinfo=TIMEZONE) +WEEK_2_MONDAY = pendulum.DateTime(2021, 9, 13, tzinfo=utc) +WEEK_2_TUESDAY = pendulum.DateTime(2021, 9, 14, tzinfo=utc) @pytest.fixture()