Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ class KubernetesPodOperator(BaseOperator):
:param log_pod_spec_on_failure: Log the pod's specification if a failure occurs
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. "delete_active_pod" deletes
pods that are still active (Pending or Running).
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds
Expand Down Expand Up @@ -1029,7 +1030,11 @@ def cleanup(
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None

# if the pod fails or success, but we don't want to delete it
if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
if (
pod_phase != PodPhase.SUCCEEDED
or self.on_finish_action == OnFinishAction.KEEP_POD
or self.on_finish_action == OnFinishAction.DELETE_ACTIVE_POD
):
self.patch_already_checked(remote_pod, reraise=False)

failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
Expand Down Expand Up @@ -1165,13 +1170,21 @@ def kill_istio_sidecar(self, pod: V1Pod) -> None:
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True) -> bool:
with _optionally_suppress(reraise=reraise):
if pod is not None:
should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and (
pod.status.phase == PodPhase.SUCCEEDED
or container_is_succeeded(pod, self.base_container_name)
should_delete_pod = (
(self.on_finish_action == OnFinishAction.DELETE_POD)
or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and (
pod.status.phase == PodPhase.SUCCEEDED
or container_is_succeeded(pod, self.base_container_name)
)
)
or (
self.on_finish_action == OnFinishAction.DELETE_ACTIVE_POD
and (pod.status.phase == PodPhase.RUNNING or pod.status.phase == PodPhase.PENDING)
)
)

if should_delete_pod:
self.log.info("Deleting pod: %s", pod.metadata.name)
self.pod_manager.delete_pod(pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ class OnFinishAction(str, enum.Enum):

KEEP_POD = "keep_pod"
DELETE_POD = "delete_pod"
DELETE_ACTIVE_POD = "delete_active_pod"
DELETE_SUCCEEDED_POD = "delete_succeeded_pod"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,53 @@ def test_omitted_namespace_no_conn_not_in_k8s(self, mock_find, mock_path):
)
mock_find.assert_called_once_with("default", context=context)

@pytest.mark.parametrize(
("on_finish_action", "pod_phase", "should_delete"),
[
(OnFinishAction.DELETE_POD, PodPhase.PENDING, True),
(OnFinishAction.DELETE_POD, PodPhase.RUNNING, True),
(OnFinishAction.DELETE_POD, PodPhase.SUCCEEDED, True),
(OnFinishAction.DELETE_POD, PodPhase.FAILED, True),
(OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.PENDING, False),
(OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.RUNNING, False),
(OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.SUCCEEDED, True),
(OnFinishAction.DELETE_SUCCEEDED_POD, PodPhase.FAILED, False),
(OnFinishAction.DELETE_ACTIVE_POD, PodPhase.PENDING, True),
(OnFinishAction.DELETE_ACTIVE_POD, PodPhase.RUNNING, True),
(OnFinishAction.DELETE_ACTIVE_POD, PodPhase.SUCCEEDED, False),
(OnFinishAction.DELETE_ACTIVE_POD, PodPhase.FAILED, False),
],
)
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
def test_process_pod_deletion(self, delete_pod_mock, on_finish_action, pod_phase, should_delete):
"""Test KubernetesPodOperator pod cleanup behavior across all modes and pod phases."""

k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
labels={"foo": "bar"},
name="test",
task_id="task",
do_xcom_push=False,
)

k.on_finish_action = on_finish_action

pod = MagicMock()
pod.metadata.name = "pod-name"
pod.status.phase = pod_phase

result = k.process_pod_deletion(pod)

# Assert deletion side-effect behavior
if should_delete:
delete_pod_mock.assert_called_once_with(pod)
else:
delete_pod_mock.assert_not_called()

assert result == should_delete

@pytest.mark.parametrize(
"pod_phase",
[
Expand Down Expand Up @@ -918,6 +965,8 @@ def test_termination_grace_period_default_value_correctly_set(self):
({"on_finish_action": "keep_pod"}, True, False),
({"on_finish_action": "delete_succeeded_pod"}, False, True),
({"on_finish_action": "delete_succeeded_pod"}, True, False),
({"on_finish_action": "delete_active_pod"}, True, True),
({"on_finish_action": "delete_active_pod"}, False, True),
],
)
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
Expand Down Expand Up @@ -995,6 +1044,7 @@ def test_pod_with_istio_delete_after_await_container_error(
pytest.param({"on_finish_action": "delete_pod"}, True, id="delete-pod"),
pytest.param({"on_finish_action": "delete_succeeded_pod"}, False, id="delete-succeeded-pod"),
pytest.param({"on_finish_action": "keep_pod"}, False, id="keep-pod"),
pytest.param({"on_finish_action": "delete_active_pod"}, False, id="delete-active-pod"),
],
)
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
Expand Down Expand Up @@ -1614,6 +1664,8 @@ def test_wait_for_xcom_sidecar_iff_push_xcom(self, mock_await, mock_extract_xcom
({"on_finish_action": "delete_pod"}, True, True),
({"on_finish_action": "delete_succeeded_pod"}, False, True),
({"on_finish_action": "delete_succeeded_pod"}, True, False),
({"on_finish_action": "delete_active_pod"}, False, False),
({"on_finish_action": "delete_active_pod"}, True, False),
],
)
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
Expand Down Expand Up @@ -2163,7 +2215,8 @@ def test_await_container_completion_retries_on_specific_exception(
)

@pytest.mark.parametrize(
"on_finish_action", [OnFinishAction.KEEP_POD, OnFinishAction.DELETE_SUCCEEDED_POD]
"on_finish_action",
[OnFinishAction.KEEP_POD, OnFinishAction.DELETE_ACTIVE_POD, OnFinishAction.DELETE_SUCCEEDED_POD],
)
@patch(KUB_OP_PATH.format("patch_already_checked"))
@patch(KUB_OP_PATH.format("process_pod_deletion"))
Expand Down