Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,13 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> None:
self.log.info("Identified spark application id: %s", self._kubernetes_application_id)

# Store the Spark Exit code
match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line)
if match_exit_code:
self._spark_exit_code = int(match_exit_code.group(1))
# Cluster mode requires the Exit code to determine the program status
if self._connection.get("deploy_mode") == "cluster":
match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line)
if match_exit_code:
self._spark_exit_code = int(match_exit_code.group(1))
else:
self._spark_exit_code = 0

# if we run in standalone cluster mode and we want to track the driver status
# we need to extract the driver id from the logs. This allows us to poll for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def setup_method(self):
extra='{"deploy-mode": "cluster", "namespace": "mynamespace"}',
)
)
db.merge_conn(
Connection(
conn_id="spark_k8s_client",
conn_type="spark",
host="k8s://https://k8s-master",
extra='{"deploy-mode": "client", "namespace": "mynamespace"}',
)
)
db.merge_conn(
Connection(conn_id="spark_default_mesos", conn_type="spark", host="mesos://host", port=5050)
)
Expand Down Expand Up @@ -804,6 +812,22 @@ def test_process_spark_submit_log_k8s_spark_3(self):
# Then
assert hook._spark_exit_code == 999

def test_process_spark_client_mode_submit_log_k8s(self):
# Given
hook = SparkSubmitHook(conn_id="spark_k8s_client")
log_lines = [
"INFO - The executor with id 2 exited with exit code 137(SIGKILL, possible container OOM).",
"...",
"Pi is roughly 3.141640",
"SparkContext: Successfully stopped SparkContext",
]

# When
hook._process_spark_submit_log(log_lines)

# Then
assert hook._spark_exit_code == 0

def test_process_spark_submit_log_standalone_cluster(self):
# Given
hook = SparkSubmitHook(conn_id="spark_standalone_cluster")
Expand Down