From a4fa03550ca3e29620972184276827dd8e96fc22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= <6774676+eumiro@users.noreply.github.com> Date: Wed, 16 Aug 2023 18:06:50 +0200 Subject: [PATCH] Import utc from datetime and normalize its import --- airflow/example_dags/example_sensors.py | 19 +++++++++++-------- .../providers/cncf/kubernetes/triggers/pod.py | 7 +++---- .../google/cloud/operators/workflows.py | 11 ++++++----- .../providers/amazon/aws/utils/test_utils.py | 6 ++---- .../cncf/kubernetes/triggers/test_pod.py | 5 ++--- .../google/cloud/operators/test_workflows.py | 9 ++++++--- .../cloud/transfers/test_postgres_to_gcs.py | 3 +-- .../cloud/triggers/test_kubernetes_engine.py | 5 ++--- 8 files changed, 33 insertions(+), 32 deletions(-) diff --git a/airflow/example_dags/example_sensors.py b/airflow/example_dags/example_sensors.py index d9e3158f544bc..a79b61ceed015 100644 --- a/airflow/example_dags/example_sensors.py +++ b/airflow/example_dags/example_sensors.py @@ -17,10 +17,9 @@ from __future__ import annotations -from datetime import datetime, timedelta +import datetime import pendulum -from pytz import UTC from airflow.models import DAG from airflow.operators.bash import BashOperator @@ -54,32 +53,36 @@ def failure_callable(): tags=["example"], ) as dag: # [START example_time_delta_sensor] - t0 = TimeDeltaSensor(task_id="wait_some_seconds", delta=timedelta(seconds=2)) + t0 = TimeDeltaSensor(task_id="wait_some_seconds", delta=datetime.timedelta(seconds=2)) # [END example_time_delta_sensor] # [START example_time_delta_sensor_async] - t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=timedelta(seconds=2)) + t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=datetime.timedelta(seconds=2)) # [END example_time_delta_sensor_async] # [START example_time_sensors] - t1 = TimeSensor(task_id="fire_immediately", target_time=datetime.now(tz=UTC).time()) + t1 = TimeSensor( + task_id="fire_immediately", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() + ) t2 = TimeSensor( task_id="timeout_after_second_date_in_the_future", timeout=1, soft_fail=True, - target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(), + target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) # [END example_time_sensors] # [START example_time_sensors_async] - t1a = TimeSensorAsync(task_id="fire_immediately_async", target_time=datetime.now(tz=UTC).time()) + t1a = TimeSensorAsync( + task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() + ) t2a = TimeSensorAsync( task_id="timeout_after_second_date_in_the_future_async", timeout=1, soft_fail=True, - target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(), + target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) # [END example_time_sensors_async] diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 8c8820cd45bc3..7b7a9afe59455 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -17,13 +17,12 @@ from __future__ import annotations import asyncio +import datetime import warnings from asyncio import CancelledError -from datetime import datetime from enum import Enum from typing import Any, AsyncIterator -import pytz from kubernetes_asyncio.client.models import V1Pod from airflow.exceptions import AirflowProviderDeprecationWarning @@ -74,7 +73,7 @@ def __init__( self, pod_name: str, pod_namespace: str, - trigger_start_time: datetime, + trigger_start_time: datetime.datetime, base_container_name: str, kubernetes_conn_id: str | None = None, poll_interval: float = 2, @@ -167,7 +166,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] self.log.info("Container is not completed and still working.") if pod_status == PodPhase.PENDING and container_state == ContainerState.UNDEFINED: - delta = datetime.now(tz=pytz.UTC) - self.trigger_start_time + delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time if delta.total_seconds() >= self.startup_timeout: message = ( f"Pod took longer than {self.startup_timeout} seconds to start. " diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py index 4f2517f5da017..2887bd135846a 100644 --- a/airflow/providers/google/cloud/operators/workflows.py +++ b/airflow/providers/google/cloud/operators/workflows.py @@ -16,13 +16,12 @@ # under the License. from __future__ import annotations +import datetime import json import re import uuid -from datetime import datetime, timedelta from typing import TYPE_CHECKING, Sequence -import pytz from google.api_core.exceptions import AlreadyExists from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.api_core.retry import Retry @@ -624,7 +623,7 @@ def __init__( *, workflow_id: str, location: str, - start_date_filter: datetime | None = None, + start_date_filter: datetime.datetime | None = None, project_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -637,7 +636,9 @@ def __init__( self.workflow_id = workflow_id self.location = location - self.start_date_filter = start_date_filter or datetime.now(tz=pytz.UTC) - timedelta(minutes=60) + self.start_date_filter = start_date_filter or datetime.datetime.now( + tz=datetime.timezone.utc + ) - datetime.timedelta(minutes=60) self.project_id = project_id self.retry = retry self.timeout = timeout @@ -668,7 +669,7 @@ def execute(self, context: Context): return [ Execution.to_dict(e) for e in execution_iter - if e.start_time.ToDatetime(tzinfo=pytz.UTC) > self.start_date_filter + if e.start_time.ToDatetime(tzinfo=datetime.timezone.utc) > self.start_date_filter ] diff --git a/tests/providers/amazon/aws/utils/test_utils.py b/tests/providers/amazon/aws/utils/test_utils.py index 66d5f734dc7db..7f951d06dddd9 100644 --- a/tests/providers/amazon/aws/utils/test_utils.py +++ b/tests/providers/amazon/aws/utils/test_utils.py @@ -16,9 +16,7 @@ # under the License. from __future__ import annotations -from datetime import datetime - -import pytz +import datetime from airflow.providers.amazon.aws.utils import ( _StringCompareEnum, @@ -28,7 +26,7 @@ get_airflow_version, ) -DT = datetime(2000, 1, 1, tzinfo=pytz.UTC) +DT = datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc) EPOCH = 946_684_800 diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index fbfff17278c7f..5719dcefcac33 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -18,13 +18,12 @@ from __future__ import annotations import asyncio +import datetime import logging from asyncio import CancelledError, Future -from datetime import datetime from unittest import mock import pytest -import pytz from kubernetes.client import models as k8s from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger @@ -41,7 +40,7 @@ IN_CLUSTER = False GET_LOGS = True STARTUP_TIMEOUT_SECS = 120 -TRIGGER_START_TIME = datetime.now(tz=pytz.UTC) +TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc) FAILED_RESULT_MSG = "Test message that appears when trigger have failed event." BASE_CONTAINER_NAME = "base" ON_FINISH_ACTION = "delete_pod" diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py index ad202fa5929d4..7ecd1627fe201 100644 --- a/tests/providers/google/cloud/operators/test_workflows.py +++ b/tests/providers/google/cloud/operators/test_workflows.py @@ -19,7 +19,6 @@ import datetime from unittest import mock -import pytz from google.protobuf.timestamp_pb2 import Timestamp from airflow.providers.google.cloud.operators.workflows import ( @@ -171,7 +170,9 @@ class TestWorkflowsListWorkflowsOperator: @mock.patch(BASE_PATH.format("WorkflowsHook")) def test_execute(self, mock_hook, mock_object): timestamp = Timestamp() - timestamp.FromDatetime(datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)) + timestamp.FromDatetime( + datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5) + ) workflow_mock = mock.MagicMock() workflow_mock.start_time = timestamp mock_hook.return_value.list_workflows.return_value = [workflow_mock] @@ -334,7 +335,9 @@ class TestWorkflowExecutionsListExecutionsOperator: @mock.patch(BASE_PATH.format("WorkflowsHook")) def test_execute(self, mock_hook, mock_object): timestamp = Timestamp() - timestamp.FromDatetime(datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)) + timestamp.FromDatetime( + datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5) + ) execution_mock = mock.MagicMock() execution_mock.start_time = timestamp mock_hook.return_value.list_executions.return_value = [execution_mock] diff --git a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py index 4d9934f188dc2..8d0660b8faa48 100644 --- a/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_postgres_to_gcs.py @@ -21,7 +21,6 @@ from unittest.mock import patch import pytest -import pytz from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator from airflow.providers.postgres.hooks.postgres import PostgresHook @@ -110,7 +109,7 @@ def _assert_uploaded_file_content(self, bucket, obj, tmp_filename, mime_type, gz (datetime.date(1000, 1, 2), "1000-01-02"), (datetime.datetime(1970, 1, 1, 1, 0, tzinfo=None), "1970-01-01T01:00:00"), ( - datetime.datetime(2022, 1, 1, 2, 0, tzinfo=pytz.UTC), + datetime.datetime(2022, 1, 1, 2, 0, tzinfo=datetime.timezone.utc), 1641002400.0, ), (datetime.time(hour=0, minute=0, second=0), "0:00:00"), diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index e695822d3863f..6a781696257b1 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -18,13 +18,12 @@ from __future__ import annotations import asyncio +import datetime import logging from asyncio import CancelledError, Future -from datetime import datetime from unittest import mock import pytest -import pytz from google.cloud.container_v1.types import Operation from kubernetes.client import models as k8s @@ -43,7 +42,7 @@ SHOULD_DELETE_POD = True GET_LOGS = True STARTUP_TIMEOUT_SECS = 120 -TRIGGER_START_TIME = datetime.now(tz=pytz.UTC) +TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc) CLUSTER_URL = "https://test-host" SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT" FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."