diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 54331fa6cef07..4e36e0eefdc9e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -1422,12 +1422,21 @@ def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> k8s.V1Pod: self.process_pod_deletion(old_pod) return new_pod - @staticmethod - def _get_most_recent_pod_index(pod_list: list[k8s.V1Pod]) -> int: + def _get_most_recent_pod_index(self, pod_list: list[k8s.V1Pod]) -> int: """Loop through a list of V1Pod objects and get the index of the most recent one.""" pod_start_times: list[datetime.datetime] = [ pod.to_dict().get("status").get("start_time") for pod in pod_list ] + if not all(pod_start_times): + self.log.info( + "Unable to determine most recent pod using start_time (some pods have not started yet). Falling back to creation_timestamp from pod metadata." + ) + pod_start_times: list[datetime.datetime] = [ # type: ignore[no-redef] + pod.to_dict() + .get("metadata", {}) + .get("creation_timestamp", datetime.datetime.now(tz=datetime.timezone.utc)) + for pod in pod_list + ] most_recent_start_time = max(pod_start_times) return pod_start_times.index(most_recent_start_time) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index 5b0b247c938b9..b754654a556c6 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -2253,6 +2253,45 @@ def test_process_duplicate_label_pods__label_patched_if_action_is_not_delete_pod process_pod_deletion_mock.assert_not_called() assert result.metadata.name == pod_2.metadata.name + @pytest.mark.parametrize( + "on_finish_action", [OnFinishAction.KEEP_POD, OnFinishAction.DELETE_SUCCEEDED_POD] + ) + @patch(KUB_OP_PATH.format("patch_already_checked")) + @patch(KUB_OP_PATH.format("process_pod_deletion")) + def test_process_duplicate_label_pods_with_start_time_none( + self, + process_pod_deletion_mock, + patch_already_checked_mock, + on_finish_action, + ): + now = datetime.datetime.now() + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:22.04", + cmds=["bash", "-cx"], + arguments=["echo 12"], + name="test", + task_id="task", + do_xcom_push=False, + reattach_on_restart=False, + on_finish_action=on_finish_action, + ) + context = create_context(k) + pod_1 = k.get_or_create_pod(pod_request_obj=k.build_pod_request_obj(context), context=context) + pod_2 = k.get_or_create_pod(pod_request_obj=k.build_pod_request_obj(context), context=context) + + pod_1.status = {"start_time": None} + pod_2.status = {"start_time": None} + pod_1.metadata.creation_timestamp = now + pod_2.metadata.creation_timestamp = now + datetime.timedelta(seconds=60) + pod_2.metadata.labels.update({"try_number": "2"}) + + result = k.process_duplicate_label_pods([pod_1, pod_2]) + + patch_already_checked_mock.assert_called_once_with(pod_1, reraise=False) + process_pod_deletion_mock.assert_not_called() + assert result.metadata.name == pod_2.metadata.name + @patch(KUB_OP_PATH.format("patch_already_checked")) @patch(KUB_OP_PATH.format("process_pod_deletion")) def test_process_duplicate_label_pods__pod_removed_if_delete_pod(