From 2f19864fbe8bd120f464eb11e7efa8aba9a719de Mon Sep 17 00:00:00 2001 From: Sameer Mesiah Date: Sun, 7 Dec 2025 13:38:59 +0000 Subject: [PATCH] Add `delete_active_pod` cleanup option and corresponding unit tests --- .../cncf/kubernetes/operators/pod.py | 27 ++++++--- .../cncf/kubernetes/utils/pod_manager.py | 1 + .../cncf/kubernetes/operators/test_pod.py | 55 ++++++++++++++++++- 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index aac466ae59e35..25af6bdac453c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -234,7 +234,8 @@ class KubernetesPodOperator(BaseOperator): :param log_pod_spec_on_failure: Log the pod's specification if a failure occurs :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", - only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. + only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. "delete_active_pod" deletes + pods that are still active (Pending or Running). :param termination_message_policy: The termination message policy of the base container. Default value is "File" :param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds @@ -1029,7 +1030,11 @@ def cleanup( pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None # if the pod fails or success, but we don't want to delete it - if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: + if ( + pod_phase != PodPhase.SUCCEEDED + or self.on_finish_action == OnFinishAction.KEEP_POD + or self.on_finish_action == OnFinishAction.DELETE_ACTIVE_POD + ): self.patch_already_checked(remote_pod, reraise=False) failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or ( @@ -1165,13 +1170,21 @@ def kill_istio_sidecar(self, pod: V1Pod) -> None: def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True) -> bool: with _optionally_suppress(reraise=reraise): if pod is not None: - should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or ( - self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD - and ( - pod.status.phase == PodPhase.SUCCEEDED - or container_is_succeeded(pod, self.base_container_name) + should_delete_pod = ( + (self.on_finish_action == OnFinishAction.DELETE_POD) + or ( + self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD + and ( + pod.status.phase == PodPhase.SUCCEEDED + or container_is_succeeded(pod, self.base_container_name) + ) + ) + or ( + self.on_finish_action == OnFinishAction.DELETE_ACTIVE_POD + and (pod.status.phase == PodPhase.RUNNING or pod.status.phase == PodPhase.PENDING) ) ) + if should_delete_pod: self.log.info("Deleting pod: %s", pod.metadata.name) self.pod_manager.delete_pod(pod) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 667dfa02fcd2f..a0fce569a7f9e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -926,6 +926,7 @@ class OnFinishAction(str, enum.Enum): KEEP_POD = "keep_pod" DELETE_POD = "delete_pod" + DELETE_ACTIVE_POD = "delete_active_pod" DELETE_SUCCEEDED_POD = "delete_succeeded_pod" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index 0a16c9920fe73..33c732196a3d9 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -719,6 +719,53 @@ def test_omitted_namespace_no_conn_not_in_k8s(self, mock_find, mock_path): ) mock_find.assert_called_once_with("default", context=context) + @pytest.mark.parametrize( + ("on_finish_action", "pod_phase", "should_delete"), + [ + (OnFinishAction.DELETE_POD, PodPhase.PENDING, True), + (OnFinishAction.DELETE_POD, PodPhase.RUNNING, True), + (OnFinishAction.DELETE_POD, PodPhase.SUCCEEDED, True), + (OnFinishAction.DELETE_POD, PodPhase.FAILED, True), + (OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.PENDING, False), + (OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.RUNNING, False), + (OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.SUCCEEDED, True), + (OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.FAILED, False), + (OnFinishAction.DELETE_ACTIVE_POD, PodPhase.PENDING, True), + (OnFinishAction.DELETE_ACTIVE_POD, PodPhase.RUNNING, True), + (OnFinishAction.DELETE_ACTIVE_POD, PodPhase.SUCCEEDED, False), + (OnFinishAction.DELETE_ACTIVE_POD, PodPhase.FAILED, False), + ], + ) + @patch(f"{POD_MANAGER_CLASS}.delete_pod") + def test_process_pod_deletion(self, delete_pod_mock, on_finish_action, pod_phase, should_delete): + """Test KubernetesPodOperator pod cleanup behavior across all modes and pod phases.""" + + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + labels={"foo": "bar"}, + name="test", + task_id="task", + do_xcom_push=False, + ) + + k.on_finish_action = on_finish_action + + pod = MagicMock() + pod.metadata.name = "pod-name" + pod.status.phase = pod_phase + + result = k.process_pod_deletion(pod) + + # Assert deletion side-effect behavior + if should_delete: + delete_pod_mock.assert_called_once_with(pod) + else: + delete_pod_mock.assert_not_called() + + assert result == should_delete + @pytest.mark.parametrize( "pod_phase", [ @@ -918,6 +965,8 @@ def test_termination_grace_period_default_value_correctly_set(self): ({"on_finish_action": "keep_pod"}, True, False), ({"on_finish_action": "delete_succeeded_pod"}, False, True), ({"on_finish_action": "delete_succeeded_pod"}, True, False), + ({"on_finish_action": "delete_active_pod"}, True, True), + ({"on_finish_action": "delete_active_pod"}, False, True), ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") @@ -995,6 +1044,7 @@ def test_pod_with_istio_delete_after_await_container_error( pytest.param({"on_finish_action": "delete_pod"}, True, id="delete-pod"), pytest.param({"on_finish_action": "delete_succeeded_pod"}, False, id="delete-succeeded-pod"), pytest.param({"on_finish_action": "keep_pod"}, False, id="keep-pod"), + pytest.param({"on_finish_action": "delete_active_pod"}, False, id="delete-active-pod"), ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") @@ -1614,6 +1664,8 @@ def test_wait_for_xcom_sidecar_iff_push_xcom(self, mock_await, mock_extract_xcom ({"on_finish_action": "delete_pod"}, True, True), ({"on_finish_action": "delete_succeeded_pod"}, False, True), ({"on_finish_action": "delete_succeeded_pod"}, True, False), + ({"on_finish_action": "delete_active_pod"}, False, False), + ({"on_finish_action": "delete_active_pod"}, True, False), ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") @@ -2163,7 +2215,8 @@ def test_await_container_completion_retries_on_specific_exception( ) @pytest.mark.parametrize( - "on_finish_action", [OnFinishAction.KEEP_POD, OnFinishAction.DELETE_SUCCEEDED_POD] + "on_finish_action", + [OnFinishAction.KEEP_POD, OnFinishAction.DELETE_ACTIVE_POD, OnFinishAction.DELETE_SUCCEEDED_POD], ) @patch(KUB_OP_PATH.format("patch_already_checked")) @patch(KUB_OP_PATH.format("process_pod_deletion"))