From ec776f33fff5741fb1df38c2daecbb624b099503 Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 22 Aug 2023 14:07:42 +0800 Subject: [PATCH 1/7] [SPARK-44906][K8S] Move Utils. SubstituteAppNExecIds logic into KubernetesConf.annotations --- .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 6 +++++- .../spark/deploy/k8s/features/BasicDriverFeatureStep.scala | 3 +-- .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 6 ++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index d8cb881bf082..6c454f21a13b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -116,7 +116,10 @@ private[spark] class KubernetesDriverConf( } override def annotations: Map[String, String] = { - KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX).map { + case(k, v) => + (k, Utils.substituteAppNExecIds(v, appId, "")) + } } def serviceLabels: Map[String, String] = { @@ -188,6 +191,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } } override def secretNamesToMountPaths: Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 2b287ea85604..11a21bb68a6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -143,8 +143,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .editOrNewMetadata() .withName(driverPodName) .addToLabels(conf.labels.asJava) - .addToAnnotations(conf.annotations.map { case (k, v) => - (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava) + .addToAnnotations(conf.annotations.asJava) .endMetadata() .editOrNewSpec() .withRestartPolicy("Never") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 0b0bbc30ba41..f3e5cad8c9e2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -255,14 +255,12 @@ private[spark] class BasicExecutorFeatureStep( case "statefulset" => "Always" case _ => "Never" } - val annotations = kubernetesConf.annotations.map { case (k, v) => - (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, kubernetesConf.executorId)) - } + val executorPodBuilder = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) .addToLabels(kubernetesConf.labels.asJava) - .addToAnnotations(annotations.asJava) + .addToAnnotations(kubernetesConf.annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() From 529187a43288e1d2de9c7e93a17e4b042000d82b Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 22 Aug 2023 14:16:31 +0800 Subject: [PATCH 2/7] Add Unit test --- .../spark/deploy/k8s/KubernetesConfSuite.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 3d310a831ea2..bcfad70d4fae 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID +import org.apache.spark.util.Utils class KubernetesConfSuite extends SparkFunSuite { @@ -42,7 +43,9 @@ class KubernetesConfSuite extends SparkFunSuite { "customLabel2Key" -> "customLabel2Value") private val CUSTOM_ANNOTATIONS = Map( "customAnnotation1Key" -> "customAnnotation1Value", - "customAnnotation2Key" -> "customAnnotation2Value") + "customAnnotation2Key" -> "customAnnotation2Value", + "customAnnotation3Key" -> "{{APP_ID}}", + "customAnnotation4Key" -> "{{EXECUTOR_ID}}") private val SECRET_NAMES_TO_MOUNT_PATHS = Map( "secret1" -> "/mnt/secrets/secret1", "secret2" -> "/mnt/secrets/secret2") @@ -93,7 +96,10 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ CUSTOM_LABELS) - assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.annotations === CUSTOM_ANNOTATIONS.map { + case (k, v) => + (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) assert(conf.environment === CUSTOM_ENVS) @@ -161,7 +167,10 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) - assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.annotations === CUSTOM_ANNOTATIONS.map { + case (k, v) => + (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) } From a422aa19d19a97d9d279e634d303399fc765783d Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 22 Aug 2023 15:49:50 +0800 Subject: [PATCH 3/7] fix missing executorId --- .../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 6c454f21a13b..7004d51f1aa3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -191,7 +191,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } + .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } } override def secretNamesToMountPaths: Map[String, String] = { From be10416c213d94383e808255e819bcc3184ec166 Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Wed, 23 Aug 2023 11:17:18 +0800 Subject: [PATCH 4/7] fix style --- .../org/apache/spark/deploy/k8s/KubernetesConf.scala | 8 +++----- .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala | 6 ++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 7004d51f1aa3..dbcc5b007b43 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -116,10 +116,8 @@ private[spark] class KubernetesDriverConf( } override def annotations: Map[String, String] = { - KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX).map { - case(k, v) => - (k, Utils.substituteAppNExecIds(v, appId, "")) - } + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) + .map(p => (p._1, Utils.substituteAppNExecIds(p._2, appId, ""))) } def serviceLabels: Map[String, String] = { @@ -191,7 +189,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } + .map(p => (p._1, Utils.substituteAppNExecIds(p._2, appId, executorId))) } override def secretNamesToMountPaths: Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index bcfad70d4fae..9963db016ad9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -97,8 +97,7 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ CUSTOM_LABELS) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { - case (k, v) => - (k, Utils.substituteAppNExecIds(v, conf.appId, "")) + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) @@ -168,8 +167,7 @@ class KubernetesConfSuite extends SparkFunSuite { SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE, SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS) assert(conf.annotations === CUSTOM_ANNOTATIONS.map { - case (k, v) => - (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) + case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, EXECUTOR_ID)) }) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) From 796659fdb53555b1c30db97a2e736e5c575eb2e5 Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Wed, 23 Aug 2023 14:59:23 +0800 Subject: [PATCH 5/7] fix comments --- .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index dbcc5b007b43..9d4e9e5f156d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -117,7 +117,7 @@ private[spark] class KubernetesDriverConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) - .map(p => (p._1, Utils.substituteAppNExecIds(p._2, appId, ""))) + .map{ case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } } def serviceLabels: Map[String, String] = { @@ -189,7 +189,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - .map(p => (p._1, Utils.substituteAppNExecIds(p._2, appId, executorId))) + .map{ case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } } override def secretNamesToMountPaths: Map[String, String] = { From 74635dce2c6f9522f37ac1d32ed58cf4af1a4305 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 24 Aug 2023 00:03:29 +0800 Subject: [PATCH 6/7] Update resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- .../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 9d4e9e5f156d..a49435b94e4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -117,7 +117,7 @@ private[spark] class KubernetesDriverConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) - .map{ case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } + .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) } } def serviceLabels: Map[String, String] = { From 1bfe4925a118d98db0cd95ce251c630e62eb9043 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 24 Aug 2023 00:04:07 +0800 Subject: [PATCH 7/7] Update resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- .../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index a49435b94e4d..4ebf31ae44ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -189,7 +189,7 @@ private[spark] class KubernetesExecutorConf( override def annotations: Map[String, String] = { KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - .map{ case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } + .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, executorId)) } } override def secretNamesToMountPaths: Map[String, String] = {