Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@
get_container_status,
)
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.common.compat.sdk import AirflowException, timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timezone import utcnow

if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event import CoreV1Event
Expand Down Expand Up @@ -281,11 +280,11 @@ def logs_available(self):
if terminated:
termination_time = terminated.finished_at
if termination_time:
return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow()
return termination_time + timedelta(seconds=self.post_termination_timeout) > timezone.utcnow()
return False

def read_pod(self):
_now = utcnow()
_now = timezone.utcnow()
if (
self.read_pod_cache is None
or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now
Expand Down Expand Up @@ -527,9 +526,9 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
exception = e
self._http_error_timestamps = getattr(self, "_http_error_timestamps", [])
self._http_error_timestamps = [
t for t in self._http_error_timestamps if t > utcnow() - timedelta(seconds=60)
t for t in self._http_error_timestamps if t > timezone.utcnow() - timedelta(seconds=60)
]
self._http_error_timestamps.append(utcnow())
self._http_error_timestamps.append(timezone.utcnow())
# Log only if more than 2 errors occurred in the last 60 seconds
if len(self._http_error_timestamps) > 2:
self.log.exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
log_pod_event,
parse_log_line,
)
from airflow.providers.common.compat.sdk import AirflowException
from airflow.utils.timezone import utc
from airflow.providers.common.compat.sdk import AirflowException, timezone

from unit.cncf.kubernetes.test_callbacks import MockKubernetesPodOperatorCallback, MockWrapper

Expand Down Expand Up @@ -1422,43 +1421,43 @@ def test_container_is_not_running(self):
[
(
False,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc),
120,
True,
),
(
False,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc),
120,
False,
),
(
False,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=timezone.utc),
120,
False,
),
(
True,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc),
120,
True,
),
(
True,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc),
120,
True,
),
(
True,
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=timezone.utc),
120,
True,
),
Expand Down Expand Up @@ -1501,43 +1500,43 @@ def test_logs_available(
[
(
120,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #0"],
),
(
120,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #0"],
),
(
120,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #1"],
),
(
2,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #0"],
),
(
2,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #0"],
),
(
2,
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=utc),
datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=timezone.utc),
["Read pod #0", "Read pod #1"],
["Read pod #0", "Read pod #1"],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
KubernetesDeleteResourceOperator,
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.providers.common.compat.sdk import AirflowException, conf, timezone
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKEHook,
GKEKubernetesHook,
Expand All @@ -59,7 +59,6 @@
)
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
from airflow.providers_manager import ProvidersManager
from airflow.utils.timezone import utcnow

try:
from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator
Expand Down Expand Up @@ -663,7 +662,7 @@ def __init__(

def invoke_defer_method(self, last_log_time: DateTime | None = None):
"""Redefine triggers which are being used in child classes."""
trigger_start_time = utcnow()
trigger_start_time = timezone.utcnow()
on_finish_action = self.on_finish_action
if type(on_finish_action) is str and self.on_finish_action not in [i.value for i in OnFinishAction]:
on_finish_action = self.on_finish_action.split(".")[-1].lower() # type: ignore[assignment]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def test_on_finish_action_handler(
@mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodTrigger"))
@mock.patch(GKE_OPERATORS_PATH.format("utcnow"))
@mock.patch(GKE_OPERATORS_PATH.format("timezone.utcnow"))
def test_invoke_defer_method(
self, mock_utcnow, mock_trigger, mock_cluster_hook, mock_fetch_cluster_info, mock_defer
):
Expand Down