diff --git a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py index 773fbc9f3c04e..df3d6a41a605b 100644 --- a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py @@ -625,9 +625,13 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> None: self.log.info("Identified spark application id: %s", self._kubernetes_application_id) # Store the Spark Exit code - match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line) - if match_exit_code: - self._spark_exit_code = int(match_exit_code.group(1)) + # Cluster mode requires the Exit code to determine the program status + if self._connection.get("deploy_mode") == "cluster": + match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line) + if match_exit_code: + self._spark_exit_code = int(match_exit_code.group(1)) + else: + self._spark_exit_code = 0 # if we run in standalone cluster mode and we want to track the driver status # we need to extract the driver id from the logs. This allows us to poll for diff --git a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py index b28eb14caa9cf..cf1f182b7b7a8 100644 --- a/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py +++ b/providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py @@ -93,6 +93,14 @@ def setup_method(self): extra='{"deploy-mode": "cluster", "namespace": "mynamespace"}', ) ) + db.merge_conn( + Connection( + conn_id="spark_k8s_client", + conn_type="spark", + host="k8s://https://k8s-master", + extra='{"deploy-mode": "client", "namespace": "mynamespace"}', + ) + ) db.merge_conn( Connection(conn_id="spark_default_mesos", conn_type="spark", host="mesos://host", port=5050) ) @@ -804,6 +812,22 @@ def test_process_spark_submit_log_k8s_spark_3(self): # Then assert hook._spark_exit_code == 999 + def test_process_spark_client_mode_submit_log_k8s(self): + # Given + hook = SparkSubmitHook(conn_id="spark_k8s_client") + log_lines = [ + "INFO - The executor with id 2 exited with exit code 137(SIGKILL, possible container OOM).", + "...", + "Pi is roughly 3.141640", + "SparkContext: Successfully stopped SparkContext", + ] + + # When + hook._process_spark_submit_log(log_lines) + + # Then + assert hook._spark_exit_code == 0 + def test_process_spark_submit_log_standalone_cluster(self): # Given hook = SparkSubmitHook(conn_id="spark_standalone_cluster")