diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c9b48158d960a..84f015d22372f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -51,9 +51,8 @@ get_container_status, ) from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults -from airflow.providers.common.compat.sdk import AirflowException +from airflow.providers.common.compat.sdk import AirflowException, timezone from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.timezone import utcnow if TYPE_CHECKING: from kubernetes.client.models.core_v1_event import CoreV1Event @@ -281,11 +280,11 @@ def logs_available(self): if terminated: termination_time = terminated.finished_at if termination_time: - return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow() + return termination_time + timedelta(seconds=self.post_termination_timeout) > timezone.utcnow() return False def read_pod(self): - _now = utcnow() + _now = timezone.utcnow() if ( self.read_pod_cache is None or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now @@ -527,9 +526,9 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None exception = e self._http_error_timestamps = getattr(self, "_http_error_timestamps", []) self._http_error_timestamps = [ - t for t in self._http_error_timestamps if t > utcnow() - timedelta(seconds=60) + t for t in self._http_error_timestamps if t > timezone.utcnow() - timedelta(seconds=60) ] - self._http_error_timestamps.append(utcnow()) + self._http_error_timestamps.append(timezone.utcnow()) # Log only if more than 2 errors occurred in the last 60 seconds if len(self._http_error_timestamps) > 2: self.log.exception( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 062266540004b..c36f39076a820 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -38,8 +38,7 @@ log_pod_event, parse_log_line, ) -from airflow.providers.common.compat.sdk import AirflowException -from airflow.utils.timezone import utc +from airflow.providers.common.compat.sdk import AirflowException, timezone from unit.cncf.kubernetes.test_callbacks import MockKubernetesPodOperatorCallback, MockWrapper @@ -1422,43 +1421,43 @@ def test_container_is_not_running(self): [ ( False, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc), 120, True, ), ( False, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc), 120, False, ), ( False, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=timezone.utc), 120, False, ), ( True, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc), 120, True, ), ( True, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc), 120, True, ), ( True, - datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc), + datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=timezone.utc), 120, True, ), @@ -1501,43 +1500,43 @@ def test_logs_available( [ ( 120, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #0"], ), ( 120, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #0"], ), ( 120, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #1"], ), ( 2, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #0"], ), ( 2, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #0"], ), ( 2, - datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc), - datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=utc), + datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc), + datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=timezone.utc), ["Read pod #0", "Read pod #1"], ["Read pod #0", "Read pod #1"], ), diff --git a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py index 2ccaba15696b7..cf50d56fc3677 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -40,7 +40,7 @@ KubernetesDeleteResourceOperator, ) from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction -from airflow.providers.common.compat.sdk import AirflowException, conf +from airflow.providers.common.compat.sdk import AirflowException, conf, timezone from airflow.providers.google.cloud.hooks.kubernetes_engine import ( GKEHook, GKEKubernetesHook, @@ -59,7 +59,6 @@ ) from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers_manager import ProvidersManager -from airflow.utils.timezone import utcnow try: from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator @@ -663,7 +662,7 @@ def __init__( def invoke_defer_method(self, last_log_time: DateTime | None = None): """Redefine triggers which are being used in child classes.""" - trigger_start_time = utcnow() + trigger_start_time = timezone.utcnow() on_finish_action = self.on_finish_action if type(on_finish_action) is str and self.on_finish_action not in [i.value for i in OnFinishAction]: on_finish_action = self.on_finish_action.split(".")[-1].lower() # type: ignore[assignment] diff --git a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py index f3588fb614af3..442f158a6e29d 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py @@ -732,7 +732,7 @@ def test_on_finish_action_handler( @mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info")) @mock.patch(GKE_OPERATORS_PATH.format("GKEHook")) @mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodTrigger")) - @mock.patch(GKE_OPERATORS_PATH.format("utcnow")) + @mock.patch(GKE_OPERATORS_PATH.format("timezone.utcnow")) def test_invoke_defer_method( self, mock_utcnow, mock_trigger, mock_cluster_hook, mock_fetch_cluster_info, mock_defer ):