Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1422,12 +1422,21 @@ def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> k8s.V1Pod:
self.process_pod_deletion(old_pod)
return new_pod

@staticmethod
def _get_most_recent_pod_index(pod_list: list[k8s.V1Pod]) -> int:
def _get_most_recent_pod_index(self, pod_list: list[k8s.V1Pod]) -> int:
"""Loop through a list of V1Pod objects and get the index of the most recent one."""
pod_start_times: list[datetime.datetime] = [
pod.to_dict().get("status").get("start_time") for pod in pod_list
]
if not all(pod_start_times):
self.log.info(
"Unable to determine most recent pod using start_time (some pods have not started yet). Falling back to creation_timestamp from pod metadata."
)
pod_start_times: list[datetime.datetime] = [ # type: ignore[no-redef]
pod.to_dict()
.get("metadata", {})
.get("creation_timestamp", datetime.datetime.now(tz=datetime.timezone.utc))
for pod in pod_list
]
most_recent_start_time = max(pod_start_times)
return pod_start_times.index(most_recent_start_time)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,45 @@ def test_process_duplicate_label_pods__label_patched_if_action_is_not_delete_pod
process_pod_deletion_mock.assert_not_called()
assert result.metadata.name == pod_2.metadata.name

@pytest.mark.parametrize(
"on_finish_action", [OnFinishAction.KEEP_POD, OnFinishAction.DELETE_SUCCEEDED_POD]
)
@patch(KUB_OP_PATH.format("patch_already_checked"))
@patch(KUB_OP_PATH.format("process_pod_deletion"))
def test_process_duplicate_label_pods_with_start_time_none(
self,
process_pod_deletion_mock,
patch_already_checked_mock,
on_finish_action,
):
now = datetime.datetime.now()
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:22.04",
cmds=["bash", "-cx"],
arguments=["echo 12"],
name="test",
task_id="task",
do_xcom_push=False,
reattach_on_restart=False,
on_finish_action=on_finish_action,
)
context = create_context(k)
pod_1 = k.get_or_create_pod(pod_request_obj=k.build_pod_request_obj(context), context=context)
pod_2 = k.get_or_create_pod(pod_request_obj=k.build_pod_request_obj(context), context=context)

pod_1.status = {"start_time": None}
pod_2.status = {"start_time": None}
pod_1.metadata.creation_timestamp = now
pod_2.metadata.creation_timestamp = now + datetime.timedelta(seconds=60)
pod_2.metadata.labels.update({"try_number": "2"})

result = k.process_duplicate_label_pods([pod_1, pod_2])

patch_already_checked_mock.assert_called_once_with(pod_1, reraise=False)
process_pod_deletion_mock.assert_not_called()
assert result.metadata.name == pod_2.metadata.name

@patch(KUB_OP_PATH.format("patch_already_checked"))
@patch(KUB_OP_PATH.format("process_pod_deletion"))
def test_process_duplicate_label_pods__pod_removed_if_delete_pod(
Expand Down