Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import utc from datetime and normalize its import #33450

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

from __future__ import annotations

from datetime import datetime, timedelta
import datetime

import pendulum
from pytz import UTC

from airflow.models import DAG
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -54,32 +53,36 @@ def failure_callable():
tags=["example"],
) as dag:
# [START example_time_delta_sensor]
t0 = TimeDeltaSensor(task_id="wait_some_seconds", delta=timedelta(seconds=2))
t0 = TimeDeltaSensor(task_id="wait_some_seconds", delta=datetime.timedelta(seconds=2))
# [END example_time_delta_sensor]

# [START example_time_delta_sensor_async]
t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=timedelta(seconds=2))
t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=datetime.timedelta(seconds=2))
# [END example_time_delta_sensor_async]

# [START example_time_sensors]
t1 = TimeSensor(task_id="fire_immediately", target_time=datetime.now(tz=UTC).time())
t1 = TimeSensor(
task_id="fire_immediately", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time()
)

t2 = TimeSensor(
task_id="timeout_after_second_date_in_the_future",
timeout=1,
soft_fail=True,
target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(),
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
)
# [END example_time_sensors]

# [START example_time_sensors_async]
t1a = TimeSensorAsync(task_id="fire_immediately_async", target_time=datetime.now(tz=UTC).time())
t1a = TimeSensorAsync(
task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time()
)

t2a = TimeSensorAsync(
task_id="timeout_after_second_date_in_the_future_async",
timeout=1,
soft_fail=True,
target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(),
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
)
# [END example_time_sensors_async]

Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/cncf/kubernetes/triggers/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from __future__ import annotations

import asyncio
import datetime
import warnings
from asyncio import CancelledError
from datetime import datetime
from enum import Enum
from typing import Any, AsyncIterator

import pytz
from kubernetes_asyncio.client.models import V1Pod

from airflow.exceptions import AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -74,7 +73,7 @@ def __init__(
self,
pod_name: str,
pod_namespace: str,
trigger_start_time: datetime,
trigger_start_time: datetime.datetime,
base_container_name: str,
kubernetes_conn_id: str | None = None,
poll_interval: float = 2,
Expand Down Expand Up @@ -167,7 +166,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
self.log.info("Container is not completed and still working.")

if pod_status == PodPhase.PENDING and container_state == ContainerState.UNDEFINED:
delta = datetime.now(tz=pytz.UTC) - self.trigger_start_time
delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
if delta.total_seconds() >= self.startup_timeout:
message = (
f"Pod took longer than {self.startup_timeout} seconds to start. "
Expand Down
11 changes: 6 additions & 5 deletions airflow/providers/google/cloud/operators/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
# under the License.
from __future__ import annotations

import datetime
import json
import re
import uuid
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Sequence

import pytz
from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
Expand Down Expand Up @@ -624,7 +623,7 @@ def __init__(
*,
workflow_id: str,
location: str,
start_date_filter: datetime | None = None,
start_date_filter: datetime.datetime | None = None,
project_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
Expand All @@ -637,7 +636,9 @@ def __init__(

self.workflow_id = workflow_id
self.location = location
self.start_date_filter = start_date_filter or datetime.now(tz=pytz.UTC) - timedelta(minutes=60)
self.start_date_filter = start_date_filter or datetime.datetime.now(
tz=datetime.timezone.utc
) - datetime.timedelta(minutes=60)
self.project_id = project_id
self.retry = retry
self.timeout = timeout
Expand Down Expand Up @@ -668,7 +669,7 @@ def execute(self, context: Context):
return [
Execution.to_dict(e)
for e in execution_iter
if e.start_time.ToDatetime(tzinfo=pytz.UTC) > self.start_date_filter
if e.start_time.ToDatetime(tzinfo=datetime.timezone.utc) > self.start_date_filter
]


Expand Down
6 changes: 2 additions & 4 deletions tests/providers/amazon/aws/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime

import pytz
import datetime

from airflow.providers.amazon.aws.utils import (
_StringCompareEnum,
Expand All @@ -28,7 +26,7 @@
get_airflow_version,
)

DT = datetime(2000, 1, 1, tzinfo=pytz.UTC)
DT = datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc)
EPOCH = 946_684_800


Expand Down
5 changes: 2 additions & 3 deletions tests/providers/cncf/kubernetes/triggers/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
from __future__ import annotations

import asyncio
import datetime
import logging
from asyncio import CancelledError, Future
from datetime import datetime
from unittest import mock

import pytest
import pytz
from kubernetes.client import models as k8s

from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger
Expand All @@ -41,7 +40,7 @@
IN_CLUSTER = False
GET_LOGS = True
STARTUP_TIMEOUT_SECS = 120
TRIGGER_START_TIME = datetime.now(tz=pytz.UTC)
TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc)
FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
BASE_CONTAINER_NAME = "base"
ON_FINISH_ACTION = "delete_pod"
Expand Down
9 changes: 6 additions & 3 deletions tests/providers/google/cloud/operators/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import datetime
from unittest import mock

import pytz
from google.protobuf.timestamp_pb2 import Timestamp

from airflow.providers.google.cloud.operators.workflows import (
Expand Down Expand Up @@ -171,7 +170,9 @@ class TestWorkflowsListWorkflowsOperator:
@mock.patch(BASE_PATH.format("WorkflowsHook"))
def test_execute(self, mock_hook, mock_object):
timestamp = Timestamp()
timestamp.FromDatetime(datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5))
timestamp.FromDatetime(
datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
)
workflow_mock = mock.MagicMock()
workflow_mock.start_time = timestamp
mock_hook.return_value.list_workflows.return_value = [workflow_mock]
Expand Down Expand Up @@ -334,7 +335,9 @@ class TestWorkflowExecutionsListExecutionsOperator:
@mock.patch(BASE_PATH.format("WorkflowsHook"))
def test_execute(self, mock_hook, mock_object):
timestamp = Timestamp()
timestamp.FromDatetime(datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5))
timestamp.FromDatetime(
datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
)
execution_mock = mock.MagicMock()
execution_mock.start_time = timestamp
mock_hook.return_value.list_executions.return_value = [execution_mock]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from unittest.mock import patch

import pytest
import pytz

from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
Expand Down Expand Up @@ -110,7 +109,7 @@ def _assert_uploaded_file_content(self, bucket, obj, tmp_filename, mime_type, gz
(datetime.date(1000, 1, 2), "1000-01-02"),
(datetime.datetime(1970, 1, 1, 1, 0, tzinfo=None), "1970-01-01T01:00:00"),
(
datetime.datetime(2022, 1, 1, 2, 0, tzinfo=pytz.UTC),
datetime.datetime(2022, 1, 1, 2, 0, tzinfo=datetime.timezone.utc),
1641002400.0,
),
(datetime.time(hour=0, minute=0, second=0), "0:00:00"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
from __future__ import annotations

import asyncio
import datetime
import logging
from asyncio import CancelledError, Future
from datetime import datetime
from unittest import mock

import pytest
import pytz
from google.cloud.container_v1.types import Operation
from kubernetes.client import models as k8s

Expand All @@ -43,7 +42,7 @@
SHOULD_DELETE_POD = True
GET_LOGS = True
STARTUP_TIMEOUT_SECS = 120
TRIGGER_START_TIME = datetime.now(tz=pytz.UTC)
TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc)
CLUSTER_URL = "https://test-host"
SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
Expand Down