diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index 9f5878b197232..b0e6e256ee0ea 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -19,9 +19,9 @@ import datetime from typing import TYPE_CHECKING, Any +import pendulum from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException from croniter import CroniterBadCronError, CroniterBadDateError, croniter -from pendulum.tz.timezone import Timezone from airflow.exceptions import AirflowTimetableInvalid from airflow.utils.dates import cron_presets @@ -29,6 +29,7 @@ if TYPE_CHECKING: from pendulum import DateTime + from pendulum.tz.timezone import Timezone def _covers_every_hour(cron: croniter) -> bool: @@ -66,7 +67,7 @@ def __init__(self, cron: str, timezone: str | Timezone) -> None: self._expression = cron_presets.get(cron, cron) if isinstance(timezone, str): - timezone = Timezone(timezone) + timezone = pendulum.tz.timezone(timezone) self._timezone = timezone try: diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 477cc4beda3a2..249cf667e10f5 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -26,12 +26,12 @@ from unittest.mock import ANY, MagicMock from uuid import uuid4 +import pendulum import pytest from kubernetes import client from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException -from pendulum.tz.timezone import Timezone from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models.connection import Connection @@ -53,7 +53,7 @@ def create_context(task) -> Context: dag = DAG(dag_id="dag") - execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=Timezone("Europe/Amsterdam")) + execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=pendulum.tz.timezone("Europe/Amsterdam")) dag_run = DagRun( dag_id=dag.dag_id, execution_date=execution_date, diff --git a/tests/api_connexion/test_parameters.py b/tests/api_connexion/test_parameters.py index 3fbe8d6309cb8..d5fdbb79b9cc1 100644 --- a/tests/api_connexion/test_parameters.py +++ b/tests/api_connexion/test_parameters.py @@ -18,9 +18,8 @@ from unittest import mock +import pendulum import pytest -from pendulum import DateTime -from pendulum.tz.timezone import Timezone from airflow.api_connexion.exceptions import BadRequest from airflow.api_connexion.parameters import ( @@ -106,7 +105,7 @@ def test_should_works_with_datetime_formatter(self): decorated_endpoint(param_a="2020-01-01T0:0:00+00:00") - endpoint.assert_called_once_with(param_a=DateTime(2020, 1, 1, 0, tzinfo=Timezone("UTC"))) + endpoint.assert_called_once_with(param_a=pendulum.datetime(2020, 1, 1, 0, tz="UTC")) def test_should_propagate_exceptions(self): decorator = format_parameters({"param_a": format_datetime}) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index a4232da30eb3c..13f812355093a 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -20,7 +20,7 @@ from datetime import datetime from json.decoder import JSONDecodeError from types import SimpleNamespace -from typing import cast +from typing import TYPE_CHECKING, cast from unittest import mock from unittest.mock import MagicMock @@ -28,8 +28,6 @@ import pytest import time_machine from kubernetes.client.rest import ApiException -from pendulum import DateTime -from pendulum.tz.timezone import Timezone from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException @@ -43,6 +41,9 @@ ) from airflow.utils.timezone import utc +if TYPE_CHECKING: + from pendulum import DateTime + class TestPodManager: def setup_method(self): @@ -269,7 +270,7 @@ def test_fetch_container_logs_returning_last_timestamp( status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) - assert status.last_log_time == cast(DateTime, pendulum.parse(timestamp_string)) + assert status.last_log_time == cast("DateTime", pendulum.parse(timestamp_string)) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") @@ -306,7 +307,7 @@ def consumer_iter(): mock_consumer_iter.side_effect = consumer_iter mock_container_is_running.side_effect = [True, True, False] status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) - assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string)) + assert status.last_log_time == cast("DateTime", pendulum.parse(last_timestamp_string)) assert self.mock_progress_callback.call_count == expected_call_count @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @@ -461,13 +462,13 @@ def test_fetch_requested_container_logs_invalid(self, container_running, contain def test_fetch_container_since_time(self, logs_available, container_running, mock_now): """If given since_time, should be used.""" mock_pod = MagicMock() - mock_now.return_value = DateTime(2020, 1, 1, 0, 0, 5, tzinfo=Timezone("UTC")) + mock_now.return_value = pendulum.datetime(2020, 1, 1, 0, 0, 5, tz="UTC") logs_available.return_value = True container_running.return_value = False self.mock_kube_client.read_namespaced_pod_log.return_value = mock.MagicMock( stream=mock.MagicMock(return_value=[b"2021-01-01 hi"]) ) - since_time = DateTime(2020, 1, 1, tzinfo=Timezone("UTC")) + since_time = pendulum.datetime(2020, 1, 1, tz="UTC") self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", since_time=since_time) args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0] assert kwargs["since_seconds"] == 5 @@ -488,7 +489,7 @@ def test_fetch_container_running_follow( ) ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", follow=follow) assert len(container_running_mock.call_args_list) == is_running_calls - assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone("UTC")) + assert ret.last_log_time == pendulum.datetime(2021, 1, 1, tz="UTC") assert ret.running is exp_running @pytest.mark.parametrize( diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index 19f647fcfaf48..54710bcd9e47a 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -23,10 +23,10 @@ from json import JSONEncoder from typing import Any +import pendulum import pytest from attrs import define from openlineage.client.utils import RedactMixin -from pendulum.tz.timezone import Timezone from pkg_resources import parse_version from airflow.models import DAG as AIRFLOW_DAG, DagModel @@ -86,8 +86,8 @@ def test_get_dagrun_start_end(): state=State.NONE, run_id=run_id, data_interval=dag.get_next_data_interval(dag_model) ) assert dagrun.data_interval_start is not None - start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=Timezone("UTC")) - end_date_tz = datetime.datetime(2022, 1, 1, hour=2, tzinfo=Timezone("UTC")) + start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=pendulum.tz.timezone("UTC")) + end_date_tz = datetime.datetime(2022, 1, 1, hour=2, tzinfo=pendulum.tz.timezone("UTC")) assert dagrun.data_interval_start, dagrun.data_interval_end == (start_date_tz, end_date_tz) diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index a40e0d01ea4fa..c059a8d236e79 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -20,10 +20,10 @@ import json from datetime import datetime, timedelta +import pendulum import pytest from dateutil import relativedelta from kubernetes.client import models as k8s -from pendulum.tz.timezone import Timezone from airflow.datasets import Dataset from airflow.exceptions import SerializationError @@ -142,7 +142,7 @@ def equal_time(a: datetime, b: datetime) -> bool: (1, None, equals), (datetime.utcnow(), DAT.DATETIME, equal_time), (timedelta(minutes=2), DAT.TIMEDELTA, equals), - (Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), + (pendulum.tz.timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), (relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a, b: a.hours == b.hours), ({"test": "dict", "test-1": 1}, None, equals), (["array_item", 2], None, equals), diff --git a/tests/timetables/test_events_timetable.py b/tests/timetables/test_events_timetable.py index eb28143de28ca..e743000f07360 100644 --- a/tests/timetables/test_events_timetable.py +++ b/tests/timetables/test_events_timetable.py @@ -20,31 +20,31 @@ import pendulum import pytest -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.events import EventsTimetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # Precedes all events +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events EVENT_DATES = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), # deliberate duplicate, should be ignored - pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE), # deliberately out of order - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), # deliberate duplicate, should be ignored + pendulum.DateTime(2021, 10, 9, tzinfo=utc), # deliberately out of order + pendulum.DateTime(2021, 9, 10, tzinfo=utc), ] EVENT_DATES_SORTED = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 10, tzinfo=utc), + pendulum.DateTime(2021, 10, 9, tzinfo=utc), ] -NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=TIMEZONE) -MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE) +NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=utc) +MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=utc) @pytest.fixture() diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 928ae83ab3d02..0cd10c3a42362 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -25,26 +25,26 @@ import time_machine from airflow.exceptions import AirflowTimetableInvalid -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) PREV_DATA_INTERVAL_START = START_DATE PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END) PREV_DATA_INTERVAL_EXACT = DataInterval.exact(PREV_DATA_INTERVAL_END) -CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=utc) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) OLD_INTERVAL = DataInterval(start=YESTERDAY, end=CURRENT_TIME) -HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) +HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", utc) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1)) -CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE) +CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", utc) DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -148,7 +148,7 @@ def test_validate_success(timetable: Timetable) -> None: "timetable, error_message", [ pytest.param( - CronDataIntervalTimetable("0 0 1 13 0", TIMEZONE), + CronDataIntervalTimetable("0 0 1 13 0", utc), "[0 0 1 13 0] is not acceptable, out of range", id="invalid-cron", ), @@ -191,19 +191,19 @@ def test_cron_interval_timezone_from_string(): [ # Arbitrary trigger time. pytest.param( - pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=utc), DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), id="adhoc", ), # Trigger time falls exactly on interval boundary. pytest.param( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), id="exact", ), @@ -213,7 +213,7 @@ def test_cron_infer_manual_data_interval_alignment( trigger_at: pendulum.DateTime, expected_interval: DataInterval, ) -> None: - timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + timetable = CronDataIntervalTimetable("@daily", utc) assert timetable.infer_manual_data_interval(run_after=trigger_at) == expected_interval @@ -222,12 +222,12 @@ def test_cron_infer_manual_data_interval_alignment( [ pytest.param( DataInterval( - pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), ), DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 9, tzinfo=utc), ), id="exact", ), @@ -235,19 +235,19 @@ def test_cron_infer_manual_data_interval_alignment( # Previous data interval does not align with the current timetable. # This is possible if the user edits a DAG with existing runs. DataInterval( - pendulum.DateTime(2022, 8, 7, 1, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 7, 1, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=utc), ), DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 9, tzinfo=utc), ), id="changed", ), ], ) def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expected_info: DagRunInfo): - timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + timetable = CronDataIntervalTimetable("@daily", utc) info = timetable.next_dagrun_info( last_automated_data_interval=last_data_interval, restriction=TimeRestriction(None, None, True), @@ -269,7 +269,7 @@ class TestCronIntervalDst: def test_entering_exact(self) -> None: timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 3, 24, tz=utc), latest=None, catchup=True, ) @@ -277,8 +277,8 @@ def test_entering_exact(self) -> None: # Last run before DST. Interval starts and ends on 2am UTC (local time is +1). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 24, 2, tz=TIMEZONE), - pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), + pendulum.datetime(2023, 3, 24, 2, tz=utc), + pendulum.datetime(2023, 3, 25, 2, tz=utc), ) # Crossing the DST switch. Interval starts on 2am UTC (local time +1) @@ -288,8 +288,8 @@ def test_entering_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE), - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 2, tz=utc), + pendulum.datetime(2023, 3, 26, 1, tz=utc), ) # In DST. Interval starts and ends on 1am UTC (local time is +2). @@ -298,14 +298,14 @@ def test_entering_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 27, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=utc), + pendulum.datetime(2023, 3, 27, 1, tz=utc), ) def test_entering_skip(self) -> None: timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 3, 24, tz=utc), latest=None, catchup=True, ) @@ -313,8 +313,8 @@ def test_entering_skip(self) -> None: # Last run before DST. Interval starts and ends on 1am UTC (local time is +1). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 24, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 24, 1, tz=utc), + pendulum.datetime(2023, 3, 25, 1, tz=utc), ) # Crossing the DST switch. Interval starts on 1am UTC (local time +1) @@ -325,8 +325,8 @@ def test_entering_skip(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), + pendulum.datetime(2023, 3, 25, 1, tz=utc), + pendulum.datetime(2023, 3, 26, 1, tz=utc), ) # In DST. Interval starts on 1am UTC (local time is +2 but 2am local @@ -336,14 +336,14 @@ def test_entering_skip(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE), - pendulum.datetime(2023, 3, 27, 0, tz=TIMEZONE), + pendulum.datetime(2023, 3, 26, 1, tz=utc), + pendulum.datetime(2023, 3, 27, 0, tz=utc), ) def test_exiting_exact(self) -> None: timetable = CronDataIntervalTimetable("0 3 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 10, 27, tz=utc), latest=None, catchup=True, ) @@ -351,8 +351,8 @@ def test_exiting_exact(self) -> None: # Last run in DST. Interval starts and ends on 1am UTC (local time is +2). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 27, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 27, 1, tz=utc), + pendulum.datetime(2023, 10, 28, 1, tz=utc), ) # Crossing the DST switch. Interval starts on 1am UTC (local time +2) @@ -362,8 +362,8 @@ def test_exiting_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 1, tz=utc), + pendulum.datetime(2023, 10, 29, 2, tz=utc), ) # Out of DST. Interval starts and ends on 2am UTC (local time is +1). @@ -372,14 +372,14 @@ def test_exiting_exact(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE), - pendulum.datetime(2023, 10, 30, 2, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 2, tz=utc), + pendulum.datetime(2023, 10, 30, 2, tz=utc), ) def test_exiting_fold(self) -> None: timetable = CronDataIntervalTimetable("0 2 * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE), + earliest=pendulum.datetime(2023, 10, 27, tz=utc), latest=None, catchup=True, ) @@ -388,8 +388,8 @@ def test_exiting_fold(self) -> None: # time is +2). next_info = timetable.next_dagrun_info(last_automated_data_interval=None, restriction=restriction) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 27, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), + pendulum.datetime(2023, 10, 27, 0, tz=utc), + pendulum.datetime(2023, 10, 28, 0, tz=utc), ) # Account for folding. Interval starts on 0am UTC (local time +2) and @@ -402,8 +402,8 @@ def test_exiting_fold(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 28, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 1, tz=utc), ) # Stepping out of DST. Interval starts from the folded 2am local time @@ -414,8 +414,8 @@ def test_exiting_fold(self) -> None: restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE), - pendulum.datetime(2023, 10, 30, 1, tz=TIMEZONE), + pendulum.datetime(2023, 10, 29, 1, tz=utc), + pendulum.datetime(2023, 10, 30, 1, tz=utc), ) @@ -429,7 +429,7 @@ class TestCronIntervalDstNonTrivial: def test_7_to_8_entering(self): timetable = CronDataIntervalTimetable("0 7-8 * * *", timezone="America/Los_Angeles") restriction = TimeRestriction( - earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + earliest=pendulum.datetime(2020, 3, 7, tz=utc), latest=None, catchup=True, ) @@ -440,8 +440,8 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 7 + 8, tz=utc), + pendulum.datetime(2020, 3, 7, 8 + 8, tz=utc), ) # This interval ends an hour early since it includes the DST switch! @@ -450,8 +450,8 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 8 + 8, tz=utc), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), ) # We're fully into DST so the interval is as expected. @@ -460,14 +460,14 @@ def test_7_to_8_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 8 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), + pendulum.datetime(2020, 3, 8, 8 + 7, tz=utc), ) def test_7_and_9_entering(self): timetable = CronDataIntervalTimetable("0 7,9 * * *", timezone="America/Los_Angeles") restriction = TimeRestriction( - earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE), + earliest=pendulum.datetime(2020, 3, 7, tz=utc), latest=None, catchup=True, ) @@ -478,8 +478,8 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 7 + 8, tz=utc), + pendulum.datetime(2020, 3, 7, 9 + 8, tz=utc), ) # This interval ends an hour early since it includes the DST switch! @@ -488,8 +488,8 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 7, 9 + 8, tz=utc), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), ) # We're fully into DST so the interval is as expected. @@ -498,15 +498,15 @@ def test_7_and_9_entering(self): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE), - pendulum.datetime(2020, 3, 8, 9 + 7, tz=TIMEZONE), + pendulum.datetime(2020, 3, 8, 7 + 7, tz=utc), + pendulum.datetime(2020, 3, 8, 9 + 7, tz=utc), ) def test_fold_scheduling(): timetable = CronDataIntervalTimetable("*/30 * * * *", timezone="Europe/Zurich") restriction = TimeRestriction( - earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), # Locally 1:30 (DST). + earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=utc), # Locally 1:30 (DST). latest=None, catchup=True, ) @@ -517,16 +517,16 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), # Locally 2am (DST). + pendulum.datetime(2023, 10, 28, 23, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 0, 0, tz=utc), # Locally 2am (DST). ) next_info = timetable.next_dagrun_info( last_automated_data_interval=next_info.data_interval, restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), # Locally 2:30 (DST). + pendulum.datetime(2023, 10, 29, 0, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 0, 30, tz=utc), # Locally 2:30 (DST). ) # Crossing into fold. @@ -535,8 +535,8 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), # Locally 2am (fold, not DST). + pendulum.datetime(2023, 10, 29, 0, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 1, 0, tz=utc), # Locally 2am (fold, not DST). ) # In the "fold zone". @@ -545,8 +545,8 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), # Locally 2am (fold, not DST). + pendulum.datetime(2023, 10, 29, 1, 0, tz=utc), + pendulum.datetime(2023, 10, 29, 1, 30, tz=utc), # Locally 2am (fold, not DST). ) # Stepping out of fold. @@ -555,6 +555,6 @@ def test_fold_scheduling(): restriction=restriction, ) assert next_info and next_info.data_interval == DataInterval( - pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), - pendulum.datetime(2023, 10, 29, 2, 0, tz=TIMEZONE), # Locally 3am (not DST). + pendulum.datetime(2023, 10, 29, 1, 30, tz=utc), + pendulum.datetime(2023, 10, 29, 2, 0, tz=utc), # Locally 3am (not DST). ) diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index 58e1f49df9058..5165a14b3c115 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -21,24 +21,23 @@ import dateutil.relativedelta import pendulum -import pendulum.tz import pytest import time_machine from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction from airflow.timetables.trigger import CronTriggerTimetable +from airflow.utils.timezone import utc -TIMEZONE = pendulum.tz.timezone("UTC") -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) PREV_DATA_INTERVAL_EXACT = DataInterval.exact(PREV_DATA_INTERVAL_END) -CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=utc) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) -HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=TIMEZONE) +HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc) DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) @@ -69,7 +68,7 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( next_start_time: pendulum.DateTime, ) -> None: """If ``catchup=False`` and start_date is a day before""" - timetable = CronTriggerTimetable("30 16 * * *", timezone=TIMEZONE) + timetable = CronTriggerTimetable("30 16 * * *", timezone=utc) next_info = timetable.next_dagrun_info( last_automated_data_interval=last_automated_data_interval, restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), @@ -81,33 +80,33 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( "current_time, earliest, expected", [ pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="current_time_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="current_time_not_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="current_time_miss_one_interval_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="current_time_miss_one_interval_not_on_boundary", ), pytest.param( - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), - pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 23, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 23, 0, 0, tzinfo=utc)), id="future_start_date", ), ], @@ -129,27 +128,27 @@ def test_hourly_cron_trigger_no_catchup_next_info( "last_automated_data_interval, earliest, expected", [ pytest.param( - DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="last_automated_on_boundary", ), pytest.param( - DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE)), + DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc)), START_DATE, - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="last_automated_not_on_boundary", ), pytest.param( None, - pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc)), id="no_last_automated_with_earliest_on_boundary", ), pytest.param( None, - pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE), - DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)), + pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc), + DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc)), id="no_last_automated_with_earliest_not_on_boundary", ), pytest.param( @@ -176,20 +175,20 @@ def test_cron_trigger_next_info_with_interval(): # Runs every Monday on 16:30, covering the day before the run. timetable = CronTriggerTimetable( "30 16 * * MON", - timezone=TIMEZONE, + timezone=utc, interval=datetime.timedelta(hours=16, minutes=30), ) next_info = timetable.next_dagrun_info( last_automated_data_interval=DataInterval( - pendulum.DateTime(2022, 8, 1, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 1, 16, 30, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 1, tzinfo=utc), + pendulum.DateTime(2022, 8, 1, 16, 30, tzinfo=utc), ), restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=True), ) assert next_info == DagRunInfo.interval( - pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2022, 8, 8, 16, 30, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=utc), + pendulum.DateTime(2022, 8, 8, 16, 30, tzinfo=utc), ) @@ -198,7 +197,7 @@ def test_validate_success() -> None: def test_validate_failure() -> None: - timetable = CronTriggerTimetable("0 0 1 13 0", timezone=TIMEZONE) + timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc) with pytest.raises(AirflowTimetableInvalid) as ctx: timetable.validate() @@ -210,13 +209,13 @@ def test_validate_failure() -> None: [ (HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", "timezone": "UTC", "interval": 0}), ( - CronTriggerTimetable("0 0 1 12 *", timezone=TIMEZONE, interval=datetime.timedelta(hours=2)), + CronTriggerTimetable("0 0 1 12 *", timezone=utc, interval=datetime.timedelta(hours=2)), {"expression": "0 0 1 12 *", "timezone": "UTC", "interval": 7200.0}, ), ( CronTriggerTimetable( "0 0 1 12 0", - timezone=pendulum.tz.timezone("Asia/Taipei"), + timezone="Asia/Taipei", interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), ), {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}}, diff --git a/tests/timetables/test_workday_timetable.py b/tests/timetables/test_workday_timetable.py index d20f45d44cf8a..41861e3d2781a 100644 --- a/tests/timetables/test_workday_timetable.py +++ b/tests/timetables/test_workday_timetable.py @@ -23,23 +23,23 @@ import pytest from airflow.example_dags.plugins.workday import AfterWorkdayTimetable -from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # This is a Saturday. +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # This is a Saturday. WEEK_1_WEEKDAYS = [ - pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE), # This is a US holiday - pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 9, tzinfo=TIMEZONE), - pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE), + pendulum.DateTime(2021, 9, 6, tzinfo=utc), # This is a US holiday + pendulum.DateTime(2021, 9, 7, tzinfo=utc), + pendulum.DateTime(2021, 9, 8, tzinfo=utc), + pendulum.DateTime(2021, 9, 9, tzinfo=utc), + pendulum.DateTime(2021, 9, 10, tzinfo=utc), ] -WEEK_1_SATURDAY = pendulum.DateTime(2021, 9, 11, tzinfo=TIMEZONE) +WEEK_1_SATURDAY = pendulum.DateTime(2021, 9, 11, tzinfo=utc) -WEEK_2_MONDAY = pendulum.DateTime(2021, 9, 13, tzinfo=TIMEZONE) -WEEK_2_TUESDAY = pendulum.DateTime(2021, 9, 14, tzinfo=TIMEZONE) +WEEK_2_MONDAY = pendulum.DateTime(2021, 9, 13, tzinfo=utc) +WEEK_2_TUESDAY = pendulum.DateTime(2021, 9, 14, tzinfo=utc) @pytest.fixture()