diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 0334947b47287..ed31b43bf2544 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -260,14 +260,13 @@ def process_status( for container_status in container_statuses_to_check: container_status_state = container_status["state"] if "waiting" in container_status_state: + waiting_reason = container_status_state["waiting"].get("reason") + waiting_message = container_status_state["waiting"].get("message") if ( - container_status_state["waiting"]["reason"] + waiting_reason in self.kube_config.worker_pod_pending_fatal_container_state_reasons ): - if ( - container_status_state["waiting"]["reason"] == "ErrImagePull" - and container_status_state["waiting"]["message"] == "pull QPS exceeded" - ): + if waiting_reason == "ErrImagePull" and waiting_message == "pull QPS exceeded": continue key = annotations_to_key(annotations=annotations) task_key_str = ( @@ -277,7 +276,7 @@ def process_status( "Event: %s has container %s with fatal reason %s, task: %s", pod_name, container_status["name"], - container_status_state["waiting"]["reason"], + waiting_reason, task_key_str, ) self.watcher_queue.put( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 470a3912cc064..a11ac67b4e0b0 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1749,6 +1749,26 @@ def assert_watcher_queue_called_once_with_state(self, state): False, id="OtherReasons", ), + pytest.param( + { + "status": { + "startTime": "2020-05-12T03:49:57Z", + "containerStatuses": [ + { + "name": "base", + "state": {"waiting": {}}, # No "reason" key - optional per K8s API spec + "lastState": {}, + "ready": False, + "restartCount": 0, + "image": "dockerhub.com/apache/airflow:latest", + "imageID": "", + } + ], + } + }, + False, + id="MissingReason", + ), ], ) def test_process_status_pending(self, raw_object, is_watcher_queue_called):