From 26a0126d63fd9ead60ede029a3e7b8e95d34492a Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 31 Jan 2018 08:45:41 +0100 Subject: [PATCH 01/13] [WIP] initial changes for the client mode support --- .../org/apache/spark/deploy/SparkSubmit.scala | 2 - .../org/apache/spark/deploy/k8s/Config.scala | 2 + .../spark/deploy/k8s/OptionRequirements.scala | 40 +++++++++++ .../k8s/SparkKubernetesClientFactory.scala | 71 ++++++++++++++++++- .../cluster/k8s/ExecutorPodFactory.scala | 36 ++++++++-- .../k8s/KubernetesClusterManager.scala | 52 +++++++++++--- .../KubernetesClusterSchedulerBackend.scala | 62 +++++++++++++--- 7 files changed, 234 insertions(+), 31 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1e381965c52b..329bde08718f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -320,8 +320,6 @@ object SparkSubmit extends CommandLineUtils with Logging { printErrorAndExit("Python applications are currently not supported for Kubernetes.") case (KUBERNETES, _) if args.isR => printErrorAndExit("R applications are currently not supported for Kubernetes.") - case (KUBERNETES, CLIENT) => - printErrorAndExit("Client mode is currently not supported for Kubernetes.") case (LOCAL, CLUSTER) => printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") case (_, CLUSTER) if isShell(args.primaryResource) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 471196ac0e3f..be649c6c433c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -58,6 +58,8 @@ private[spark] object Config extends Logging { "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" + private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala new file mode 100644 index 000000000000..89053de5b955 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +private[spark] object OptionRequirements { + + def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + def requireSecondIfFirstIsDefined( + opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { + opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index c47e78cbf19e..3811d246ea42 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -20,7 +20,7 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.{Config => Fabric8Client, ConfigBuilder, DefaultKubernetesClient, KubernetesClient} import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher @@ -36,6 +36,21 @@ import org.apache.spark.util.ThreadUtils private[spark] object SparkKubernetesClientFactory { def createKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + new java.io.File(Fabric8Client.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => createInClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix, + sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert) + case false => createOutClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix, + sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert) + } + } + + private def createInClusterKubernetesClient( master: String, namespace: Option[String], kubernetesAuthConfPrefix: String, @@ -88,6 +103,60 @@ private[spark] object SparkKubernetesClientFactory { new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) } + def createOutClusterKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" + val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" + val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) + .map(new File(_)) + .orElse(maybeServiceAccountToken) + val oauthTokenValue = sparkConf.getOption(oauthTokenConf) + OptionRequirements.requireNandDefined( + oauthTokenFile, + oauthTokenValue, + s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + + s" value $oauthTokenConf.") + + val caCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") + .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) + val clientKeyFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") + val clientCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + val dispatcher = new Dispatcher( + ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master) + .withWebsocketPingInterval(0) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + // TODO(ECH) Deal correctly with the oauth token file. + // }.withOption(oauthTokenFile) { + // (file, configBuilder) => + // configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) + }.withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + }.withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + }.withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + }.withOption(namespace) { + (ns, configBuilder) => configBuilder.withNamespace(ns) + }.build() + val baseHttpClient = HttpClientUtils.createHttpClient(config) + val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() + .dispatcher(dispatcher) + .build() + new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) + } + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) extends AnyVal { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 141bd2827e7c..c91050cc37f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -106,7 +106,16 @@ private[spark] class ExecutorPodFactory( executorEnvs: Seq[(String, String)], driverPod: Pod, nodeToLocalTaskCount: Map[String, Int]): Pod = { - val name = s"$executorPodNamePrefix-exec-$executorId" + val prefix = (executorPodNamePrefix == "spark") match { + case true => + val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") + val launchTime = System.currentTimeMillis() + s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + // We are in client mode. + case false => + executorPodNamePrefix // We are in cluster mode. + } + val name = s"$prefix-exec-$executorId" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod // name as the hostname. This preserves uniqueness since the end of name contains @@ -184,12 +193,25 @@ private[spark] class ExecutorPodFactory( .addToArgs("executor") .build() - val executorPod = new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() + val executorPod = (driverPod == null) match { + case true => new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + case false => new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() .addNewOwnerReference() .withController(true) .withApiVersion(driverPod.getApiVersion) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index a942db6ae02d..c4b6a81b41d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -18,9 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s import java.io.File -import io.fabric8.kubernetes.client.Config +import io.fabric8.kubernetes.client.{Config, KubernetesClient} -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -28,7 +28,39 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.ThreadUtils -private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { +trait ManagerSpecificHandlers { + def createKubernetesClient(sparkConf: SparkConf): KubernetesClient + } + +private[spark] class KubernetesClusterManager extends ExternalClusterManager + with ManagerSpecificHandlers with Logging { + + class InClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } + + class OutClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + sparkConf.get("spark.master").replace("k8s://", ""), + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } + + val modeHandler: ManagerSpecificHandlers = null + + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + modeHandler.createKubernetesClient(sparkConf) override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") @@ -44,6 +76,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { + val modeHandler: ManagerSpecificHandlers = { + new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => new InClusterHandlers() + case false => new OutClusterHandlers() + } + } val sparkConf = sc.getConf val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME) val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF) @@ -99,13 +137,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit None } - val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - KUBERNETES_MASTER_INTERNAL_URL, - Some(sparkConf.get(KUBERNETES_NAMESPACE)), - KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - sparkConf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + val kubernetesClient = modeHandler.createKubernetesClient(sparkConf) val executorPodFactory = new ExecutorPodFactory( sparkConf, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 9de4b16c30d3..61d6eeeb28d0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -23,13 +23,13 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import javax.annotation.concurrent.GuardedBy import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{Config => Fabric8Config, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} @@ -37,6 +37,11 @@ import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.util.Utils +trait SchedulerBackendSpecificHandlers { + def getDriverPod(): Pod + def getKubernetesDriverPodName(conf: SparkConf): String +} + private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, @@ -44,10 +49,48 @@ private[spark] class KubernetesClusterSchedulerBackend( kubernetesClient: KubernetesClient, allocatorExecutor: ScheduledExecutorService, requestExecutorsService: ExecutorService) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) with SchedulerBackendSpecificHandlers { + + class OutClusterClientModeHandlers extends SchedulerBackendSpecificHandlers { + override def getDriverPod(): Pod = null + override def getKubernetesDriverPodName(conf: SparkConf): String = null + } + + class NonOutClusterClientModeHandlers extends SchedulerBackendSpecificHandlers { + override def getDriverPod(): Pod = { + try { + kubernetesClient.pods().inNamespace(kubernetesNamespace). + withName(kubernetesDriverPodName).get() + } catch { + case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + } + override def getKubernetesDriverPodName(conf: SparkConf): String = { + conf + .get(KUBERNETES_DRIVER_POD_NAME) + .getOrElse( + throw new SparkException("Must specify the driver pod name")) + } + } import KubernetesClusterSchedulerBackend._ + val modeHandler: SchedulerBackendSpecificHandlers = { + val deployMode = conf + .get("spark.submit.deployMode") + deployMode match { + case "client" => + new java.io.File(Fabric8Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => new NonOutClusterClientModeHandlers() + case false => new OutClusterClientModeHandlers() + } + case _ => + new NonOutClusterClientModeHandlers() + } + } + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val RUNNING_EXECUTOR_PODS_LOCK = new Object @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") @@ -58,16 +101,13 @@ private[spark] class KubernetesClusterSchedulerBackend( private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) - private val kubernetesDriverPodName = conf - .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(throw new SparkException("Must specify the driver pod name")) + private val kubernetesDriverPodName = modeHandler.getKubernetesDriverPodName(conf) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( requestExecutorsService) - private val driverPod = kubernetesClient.pods() - .inNamespace(kubernetesNamespace) - .withName(kubernetesDriverPodName) - .get() + override def getDriverPod(): Pod = modeHandler.getDriverPod() + override def getKubernetesDriverPodName(conf: SparkConf): String = + getKubernetesDriverPodName(conf) protected override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { @@ -120,7 +160,7 @@ private[spark] class KubernetesClusterSchedulerBackend( applicationId(), driverUrl, conf.getExecutorEnv, - driverPod, + getDriverPod(), currentNodeToLocalTaskCount) executorsToAllocate(executorId) = executorPod logInfo( From 66a07eb28a8658744e85b9a61ac2db5df7cc0030 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Mon, 26 Mar 2018 16:59:11 +0200 Subject: [PATCH 02/13] fix ExecutorPodFactory build failure --- .../cluster/k8s/ExecutorPodFactory.scala | 109 ++++++++---------- .../k8s/KubernetesClusterManager.scala | 1 - 2 files changed, 50 insertions(+), 60 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 509362280ef8..dccf1ec1a15f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -28,16 +28,16 @@ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTI import org.apache.spark.util.Utils /** - * A factory class for bootstrapping and creating executor pods with the given bootstrapping - * components. - * - * @param sparkConf Spark configuration - * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto - * user-specified paths into the executor container - */ + * A factory class for bootstrapping and creating executor pods with the given bootstrapping + * components. + * + * @param sparkConf Spark configuration + * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto + * user-specified paths into the executor container + */ private[spark] class ExecutorPodFactory( - sparkConf: SparkConf, - mountSecretsBootstrap: Option[MountSecretsBootstrap]) { + sparkConf: SparkConf, + mountSecretsBootstrap: Option[MountSecretsBootstrap]) { private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH) @@ -87,25 +87,16 @@ private[spark] class ExecutorPodFactory( private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) /** - * Configure and construct an executor pod with the given parameters. - */ + * Configure and construct an executor pod with the given parameters. + */ def createExecutorPod( - executorId: String, - applicationId: String, - driverUrl: String, - executorEnvs: Seq[(String, String)], - driverPod: Pod, - nodeToLocalTaskCount: Map[String, Int]): Pod = { - val prefix = (executorPodNamePrefix == "spark") match { - case true => - val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") - val launchTime = System.currentTimeMillis() - s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") - // We are in client mode. - case false => - executorPodNamePrefix // We are in cluster mode. - } - val name = s"$prefix-exec-$executorId" + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { + val name = s"$executorPodNamePrefix-exec-$executorId" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod // name as the hostname. This preserves uniqueness since the end of name contains @@ -175,47 +166,47 @@ private[spark] class ExecutorPodFactory( .withImage(executorContainerImage) .withImagePullPolicy(imagePullPolicy) .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .endResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .addToArgs("executor") .build() - val executorPod = (driverPod == null) match { + val executorPod = (driverPod == null) match { case true => new PodBuilder() .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) .endMetadata() .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() .build() - case false => new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() + case false => new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() .build() val containerWithLimitCores = executorLimitCores.map { limitCores => @@ -237,8 +228,8 @@ private[spark] class ExecutorPodFactory( new PodBuilder(maybeSecretsMountedPod) .editSpec() - .addToContainers(maybeSecretsMountedContainer) - .endSpec() + .addToContainers(maybeSecretsMountedContainer) + .endSpec() .build() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 9c5ac7f733ef..0e67cd4f9225 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -87,7 +87,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager None } - val modeHandler: ManagerSpecificHandlers = { new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { case true => new InClusterHandlers() From 5e054100abd9331ae6bb2463defb6e4549e83f1a Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Mon, 26 Mar 2018 17:35:54 +0200 Subject: [PATCH 03/13] fix build --- .../cluster/k8s/ExecutorPodFactory.scala | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index dccf1ec1a15f..ee320d49ddf4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -176,38 +176,39 @@ private[spark] class ExecutorPodFactory( .build() val executorPod = (driverPod == null) match { - case true => new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() - case false => new PodBuilder() - .withNewMetadata() - .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(nodeSelector.asJava) - .endSpec() - .build() + case true => new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + case false => new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + } val containerWithLimitCores = executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new QuantityBuilder(false) From 3985bc633cc3e285b8e6be3f06f9fddb6c37df02 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 27 Mar 2018 09:29:38 +0200 Subject: [PATCH 04/13] allow client mode --- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 0e67cd4f9225..eebc562a6601 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -65,12 +65,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") - } - new TaskSchedulerImpl(sc) } From d3b2ec492545fb3c1132d916d1de9610d4f8f781 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 27 Mar 2018 15:14:08 +0200 Subject: [PATCH 05/13] don't use cacert nor token in client mode --- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index eebc562a6601..9dc791bd0328 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -53,8 +53,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager Some(sparkConf.get(KUBERNETES_NAMESPACE)), APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, sparkConf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + None, + None) } val modeHandler: ManagerSpecificHandlers = null From d102ac23564156933de872b103d1d162da733367 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 27 Mar 2018 15:14:44 +0200 Subject: [PATCH 06/13] don't use oauth token in client mode --- .../spark/deploy/k8s/SparkKubernetesClientFactory.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 3811d246ea42..7ea1bf3926e0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -137,10 +137,6 @@ private[spark] object SparkKubernetesClientFactory { .withWebsocketPingInterval(0) .withOption(oauthTokenValue) { (token, configBuilder) => configBuilder.withOauthToken(token) - // TODO(ECH) Deal correctly with the oauth token file. - // }.withOption(oauthTokenFile) { - // (file, configBuilder) => - // configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) }.withOption(caCertFile) { (file, configBuilder) => configBuilder.withCaCertFile(file) }.withOption(clientKeyFile) { From a07c9df1bd07eeee52717b0d8b73dca963b59a89 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Fri, 4 May 2018 11:31:42 +0200 Subject: [PATCH 07/13] update further to #20910 Refactor to unify driver and executor pod builder APIs --- .../features/BasicExecutorFeatureStep.scala | 57 ++++++++++++------- .../submit/KubernetesClientApplication.scala | 7 --- .../k8s/KubernetesClusterManager.scala | 41 ++++--------- .../KubernetesClusterSchedulerBackend.scala | 14 +---- 4 files changed, 50 insertions(+), 69 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 529069d3b8a0..424f08bc03aa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -151,27 +151,42 @@ private[spark] class BasicExecutorFeatureStep( .build() }.getOrElse(executorContainer) val driverPod = kubernetesConf.roleSpecificConf.driverPod - val executorPod = new PodBuilder(pod.pod) - .editOrNewMetadata() - .withName(name) - .withLabels(kubernetesConf.roleLabels.asJava) - .withAnnotations(kubernetesConf.roleAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .editOrNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(kubernetesConf.nodeSelector().asJava) - .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) - .endSpec() - .build() + val executorPod = (driverPod == null) match { + case true => new PodBuilder(pod.pod) + .editOrNewMetadata() + .withName(name) + .withLabels(kubernetesConf.roleLabels.asJava) + .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .endMetadata() + .editOrNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) + .endSpec() + .build() + case false => new PodBuilder(pod.pod) + .editOrNewMetadata() + .withName(name) + .withLabels(kubernetesConf.roleLabels.asJava) + .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .editOrNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) + .endSpec() + .build() + } SparkPod(executorPod, containerWithLimitCores) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index a97f5650fb86..a52655cf60f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -140,13 +140,6 @@ private[spark] class Client( throw e } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") - } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 967c1d7a6594..8482518737c0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -20,7 +20,7 @@ import java.io.File import io.fabric8.kubernetes.client.{Config, KubernetesClient} -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -57,7 +57,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager None) } - val modeHandler: ManagerSpecificHandlers = null + val modeHandler: ManagerSpecificHandlers = { + new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => new InClusterHandlers() + case false => new OutClusterHandlers() + } + } override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = modeHandler.createKubernetesClient(sparkConf) @@ -69,36 +74,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager } override def createSchedulerBackend( - sc: SparkContext, - masterURL: String, - scheduler: TaskScheduler): SchedulerBackend = { - + sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) -<<<<<<< HEAD - val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) { - Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths)) - } else { - None - } - - val modeHandler: ManagerSpecificHandlers = { - new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { - case true => new InClusterHandlers() - case false => new OutClusterHandlers() - } - } - val sparkConf = sc.getConf - val kubernetesClient = modeHandler.createKubernetesClient(sparkConf) -======= - val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - KUBERNETES_MASTER_INTERNAL_URL, - Some(sc.conf.get(KUBERNETES_NAMESPACE)), - KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - sc.conf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) ->>>>>>> master + val kubernetesClient = createKubernetesClient(sc.getConf) val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 9b7801aeebb0..25b703338ef5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -160,23 +160,15 @@ private[spark] class KubernetesClusterSchedulerBackend( conf, executorId, applicationId(), -<<<<<<< HEAD - driverUrl, - conf.getExecutorEnv, - getDriverPod(), - currentNodeToLocalTaskCount) - executorsToAllocate(executorId) = executorPod -======= - driverPod) + getDriverPod()) val executorPod = executorBuilder.buildFromFeatures(executorConf) val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() - .addToContainers(executorPod.container) - .endSpec() + .addToContainers(executorPod.container) + .endSpec() .build() executorsToAllocate(executorId) = podWithAttachedContainer ->>>>>>> master logInfo( s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") } From 11c1ea581f4ed7991d730b6634d2a255b6f6b264 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 10 Jul 2018 13:58:20 +0200 Subject: [PATCH 08/13] format --- .../deploy/k8s/SparkKubernetesClientFactory.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 7ea1bf3926e0..4366e42c534a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -104,12 +104,12 @@ private[spark] object SparkKubernetesClientFactory { } def createOutClusterKubernetesClient( - master: String, - namespace: Option[String], - kubernetesAuthConfPrefix: String, - sparkConf: SparkConf, - maybeServiceAccountToken: Option[File], - maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) From a24867582aa0e3f2fdfd0cb17af65e4228fc08b9 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 10 Jul 2018 14:45:39 +0200 Subject: [PATCH 09/13] add back waitForAppCompletion --- .../deploy/k8s/submit/KubernetesClientApplication.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index d3ea21781f45..eaff47205dbb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -148,6 +148,13 @@ private[spark] class Client( throw e } + if (waitForAppCompletion) { + logInfo(s"Waiting for application $appName to finish...") + watcher.awaitCompletion() + logInfo(s"Application $appName finished.") + } else { + logInfo(s"Deployed Spark application $appName into Kubernetes.") + } } } From 58ca9abad0ccc333f1250ba069371f340522a479 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 11 Jul 2018 19:16:55 +0200 Subject: [PATCH 10/13] remove legacy APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9d4a13203a1f..bf33179ae3da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -65,8 +65,6 @@ private[spark] object Config extends Logging { "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" - private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = - "spark.kubernetes.authenticate.driver.mounted" val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" From 40222778f6e1b940dc32f52e0371b537bc49b5e9 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 11 Jul 2018 19:18:14 +0200 Subject: [PATCH 11/13] move OptionRequirements to KubernetesUtils --- .../spark/deploy/k8s/KubernetesUtils.scala | 20 ++++++++++ .../spark/deploy/k8s/OptionRequirements.scala | 40 ------------------- .../k8s/SparkKubernetesClientFactory.scala | 2 +- 3 files changed, 21 insertions(+), 41 deletions(-) delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 593fb531a004..866a3c9cb2ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -60,4 +60,24 @@ private[spark] object KubernetesUtils { case _ => uri } } + + def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + def requireSecondIfFirstIsDefined( + opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { + opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala deleted file mode 100644 index 89053de5b955..000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s - -private[spark] object OptionRequirements { - - def requireBothOrNeitherDefined( - opt1: Option[_], - opt2: Option[_], - errMessageWhenFirstIsMissing: String, - errMessageWhenSecondIsMissing: String): Unit = { - requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) - requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) - } - - def requireSecondIfFirstIsDefined( - opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { - opt1.foreach { _ => - require(opt2.isDefined, errMessageWhenSecondIsMissing) - } - } - - def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { - opt1.foreach { _ => require(opt2.isEmpty, errMessage) } - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 4366e42c534a..3b99cf99e3a4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -116,7 +116,7 @@ private[spark] object SparkKubernetesClientFactory { .map(new File(_)) .orElse(maybeServiceAccountToken) val oauthTokenValue = sparkConf.getOption(oauthTokenConf) - OptionRequirements.requireNandDefined( + KubernetesUtils.requireNandDefined( oauthTokenFile, oauthTokenValue, s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + From 13fa7b2cf1fce3398f366c371a047120bd69941e Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 11 Jul 2018 19:28:24 +0200 Subject: [PATCH 12/13] add basic doc for client mode --- docs/running-on-kubernetes.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 7149616e534a..9e3670ede81a 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -260,7 +260,18 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl ## Client Mode -Client mode is not currently supported. +Client mode is currently supported when launched `Out Cluster` (not in a Pod). + +Both `spark-submit` and `spark-shell` can be used. + +For this, you need to add to your launch configuration two properties: + +``` +spark.submit.deployMode="client" +spark.kubernetes.driver.pod.name="$HOSTNAME" +``` + +Client mode is currently **not** supported when launched `In Cluster` (in a Pod). ## Future Work From ab992ca6f0374e057d865bafb11de7f3ccae2e10 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Wed, 11 Jul 2018 19:55:23 +0200 Subject: [PATCH 13/13] remove duplicat method on kubernetestutils --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index dc054dbf64a8..a558059761ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -75,7 +75,4 @@ private[spark] object KubernetesUtils { } } - def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { - opt1.foreach { _ => require(opt2.isEmpty, errMessage) } - } }