From d3e2a08006d7e29df6928a4596546ec6761325f7 Mon Sep 17 00:00:00 2001 From: mohamed-mis <149175087+mohamed-mis@users.noreply.github.com> Date: Wed, 24 Sep 2025 16:09:46 +0100 Subject: [PATCH 1/4] Preserve existing metadata labels in SparkApplication manifest Ensure that CustomObjectLauncher updates only the 'name' and 'namespace' fields in the metadata without overwriting existing labels or annotations. This change uses dict.update() to safely modify the metadata in place, preventing loss of important SparkApplication metadata during submission. --- .../cncf/kubernetes/operators/custom_object_launcher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index a83bc7a3de387..8cf1698bccf20 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -238,7 +238,12 @@ def pod_manager(self) -> PodManager: def get_body(self): self.body: dict = SparkJobSpec(**self.template_body["spark"]) - self.body.metadata = {"name": self.name, "namespace": self.namespace} + if not isinstance(self.body.metadata, dict): + self.body.metadata = {} + self.body.metadata.update({ + "name": self.name, + "namespace": self.namespace + }) if self.template_body.get("kubernetes"): k8s_spec: dict = KubernetesSpec(**self.template_body["kubernetes"]) self.body.spec["volumes"] = k8s_spec.volumes From 1ef82d744d4eedc417aba5174a1b4f8cd4ec9e8c Mon Sep 17 00:00:00 2001 From: mohamed-mis <149175087+mohamed-mis@users.noreply.github.com> Date: Thu, 25 Sep 2025 10:33:31 +0100 Subject: [PATCH 2/4] misc: format committed lines for consistency --- .../cncf/kubernetes/operators/custom_object_launcher.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index 8cf1698bccf20..9c2721ff863d2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -240,10 +240,7 @@ def get_body(self): self.body: dict = SparkJobSpec(**self.template_body["spark"]) if not isinstance(self.body.metadata, dict): self.body.metadata = {} - self.body.metadata.update({ - "name": self.name, - "namespace": self.namespace - }) + self.body.metadata.update({"name": self.name, "namespace": self.namespace}) if self.template_body.get("kubernetes"): k8s_spec: dict = KubernetesSpec(**self.template_body["kubernetes"]) self.body.spec["volumes"] = k8s_spec.volumes From bcb2ed437ff708dbf0ec2dd10eb768a60c1b765d Mon Sep 17 00:00:00 2001 From: mohamed-mis <149175087+mohamed-mis@users.noreply.github.com> Date: Thu, 25 Sep 2025 14:19:01 +0100 Subject: [PATCH 3/4] Fix: Safely handle missing `metadata` attribute in SparkJobSpec --- .../cncf/kubernetes/operators/custom_object_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index 9c2721ff863d2..c3a20af81be92 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -238,7 +238,7 @@ def pod_manager(self) -> PodManager: def get_body(self): self.body: dict = SparkJobSpec(**self.template_body["spark"]) - if not isinstance(self.body.metadata, dict): + if not hasattr(self.body, "metadata") or not isinstance(self.body.metadata, dict): self.body.metadata = {} self.body.metadata.update({"name": self.name, "namespace": self.namespace}) if self.template_body.get("kubernetes"): From 8af38fe16a0ccda0fb2fba2150f6dac34ec8fb67 Mon Sep 17 00:00:00 2001 From: mohamed-mis <149175087+mohamed-mis@users.noreply.github.com> Date: Mon, 29 Sep 2025 22:31:39 +0100 Subject: [PATCH 4/4] Add unit tests for get_body method in CustomObjectLauncher test suite for get_body logic in CustomObjectLauncher, including metadata handling --- .../operators/test_custom_object_launcher.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py index eab85e5a87339..078547af56183 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py @@ -213,6 +213,27 @@ def get_pod_status(self, reason: str, message: str | None = None): ] ) + def test_get_body_initializes_metadata_when_missing(self, mock_launcher): + mock_launcher.template_body["spark"].pop("metadata", None) + body = mock_launcher.get_body() + assert isinstance(body["metadata"], dict) + assert body["metadata"]["name"] == mock_launcher.name + assert body["metadata"]["namespace"] == mock_launcher.namespace + + def test_get_body_replaces_non_dict_metadata(self, mock_launcher): + mock_launcher.template_body["spark"]["metadata"] = "not-a-dict" + body = mock_launcher.get_body() + assert isinstance(body["metadata"], dict) + assert body["metadata"]["name"] == mock_launcher.name + assert body["metadata"]["namespace"] == mock_launcher.namespace + + def test_get_body_preserves_existing_metadata_labels(self, mock_launcher): + mock_launcher.template_body["spark"]["metadata"] = {"labels": {"team": "data"}} + body = mock_launcher.get_body() + assert body["metadata"]["labels"]["team"] == "data" + assert body["metadata"]["name"] == mock_launcher.name + assert body["metadata"]["namespace"] == mock_launcher.namespace + @patch("airflow.providers.cncf.kubernetes.operators.custom_object_launcher.PodManager") def test_start_spark_job_no_error(self, mock_pod_manager, mock_launcher): mock_launcher.start_spark_job()