Skip to content

Commit

Permalink
Fix hanging KPO on deferrable task with do_xcom_push
Browse files Browse the repository at this point in the history
  • Loading branch information
vchiapaikeo committed Feb 10, 2024
1 parent 2e95a2a commit d375618
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
4 changes: 4 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ def execute_complete(self, context: Context, event: dict, **kwargs):
message = f"{event['message']}\n{event['stack_trace']}"
else:
message = event["message"]
if self.do_xcom_push:
# In the event of base container failure, we need to kill the xcom sidecar.
# We disregard xcom output and do that here
_ = self.extract_xcom(pod=pod)
raise AirflowException(message)
elif event["status"] == "success":
# fetch some logs when pod is executed successfully
Expand Down
9 changes: 7 additions & 2 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2049,8 +2049,13 @@ def test_async_kpo_wait_termination_before_cleanup_on_failure(
# assert that it does not push the xcom
ti_mock.xcom_push.assert_not_called()

# assert that the xcom are not extracted
mock_extract_xcom.assert_not_called()
if do_xcom_push:
# assert that the xcom are not extracted if do_xcom_push is Fale
mock_extract_xcom.assert_called_once()
else:
# but that it is extracted when do_xcom_push is true because the sidecare
# needs to be terminated
mock_extract_xcom.assert_not_called()

# check if it waits for the pod to complete
assert read_pod_mock.call_count == 3
Expand Down

0 comments on commit d375618

Please sign in to comment.