diff --git a/assembly/pom.xml b/assembly/pom.xml index ec243eaebaea..db8ac768a877 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -148,6 +148,16 @@ + + kubernetes + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + + + hive diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 85f80b6971e8..63314b1d6d30 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -69,7 +69,8 @@ object SparkSubmit extends CommandLineUtils { private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL + private val KUBERNETES = 16 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL // Deploy modes private val CLIENT = 1 @@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils { YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS + case m if m.startsWith("kubernetes") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, local") @@ -274,6 +276,7 @@ object SparkSubmit extends CommandLineUtils { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER + val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code @@ -320,6 +323,10 @@ object SparkSubmit extends CommandLineUtils { // The following modes are not supported or applicable (clusterManager, deployMode) match { + case (KUBERNETES, CLIENT) => + printErrorAndExit("Client mode is currently not supported for Kubernetes.") + case (KUBERNETES, CLUSTER) if args.isPython || args.isR => + printErrorAndExit("Kubernetes does not currently support python or R applications.") case (STANDALONE, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") @@ -453,7 +460,17 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), - // Other options + // Kubernetes only + OptionAssigner(args.kubernetesMaster, KUBERNETES, ALL_DEPLOY_MODES, + sysProp = "spark.kubernetes.master"), + OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES, + sysProp = "spark.kubernetes.namespace"), + OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.driver.uploads.jars"), + OptionAssigner(args.kubernetesUploadDriverExtraClasspath, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.driver.uploads.driverExtraClasspath"), + + // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, @@ -496,8 +513,9 @@ object SparkSubmit extends CommandLineUtils { // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" + // In Kubernetes cluster mode, the jar will be uploaded by the client separately. // For python and R files, the primary resource is already distributed as a regular file - if (!isYarnCluster && !args.isPython && !args.isR) { + if (!isYarnCluster && !isKubernetesCluster && !args.isPython && !args.isR) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) @@ -596,6 +614,13 @@ object SparkSubmit extends CommandLineUtils { } } + if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.kubernetes.Client" + childArgs += args.primaryResource + childArgs += args.mainClass + childArgs ++= args.childArgs + } + // Load any properties specified through --conf and the default properties file for ((k, v) <- args.sparkProperties) { sysProps.getOrElseUpdate(k, v) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index b1d36e1821cc..5f2dba5aa644 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -71,6 +71,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var principal: String = null var keytab: String = null + // Kubernetes only + var kubernetesMaster: String = null + var kubernetesNamespace: String = null + var kubernetesUploadJars: String = null + var kubernetesUploadDriverExtraClasspath: String = null + // Standalone cluster mode only var supervise: Boolean = false var driverCores: String = null @@ -186,6 +192,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .getOrElse(sparkProperties.get("spark.executor.instances").orNull) keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull + kubernetesMaster = Option(kubernetesMaster) + .orElse(sparkProperties.get("spark.kubernetes.master")) + .orNull + kubernetesNamespace = Option(kubernetesNamespace) + .orElse(sparkProperties.get("spark.kubernetes.namespace")) + .orNull + kubernetesUploadJars = Option(kubernetesUploadJars) + .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars")) + .orNull + kubernetesUploadDriverExtraClasspath = Option(kubernetesUploadDriverExtraClasspath) + .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.driverExtraClasspath")) + .orNull // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) { @@ -424,6 +442,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case KEYTAB => keytab = value + case KUBERNETES_MASTER => + kubernetesMaster = value + + case KUBERNETES_NAMESPACE => + kubernetesNamespace = value + + case KUBERNETES_UPLOAD_JARS => + kubernetesUploadJars = value + + case KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH => + kubernetesUploadDriverExtraClasspath = value + case HELP => printUsageAndExit(0) diff --git a/dev/scalastyle b/dev/scalastyle index f3dec833636c..de7423913fad 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -26,6 +26,8 @@ ERRORS=$(echo -e "q\n" \ -Pyarn \ -Phive \ -Phive-thriftserver \ + -Pkubernetes \ + -Pkubernetes-integration-tests \ scalastyle test:scalastyle \ | awk '{if($1~/error/)print}' \ ) diff --git a/kubernetes/core/pom.xml b/kubernetes/core/pom.xml new file mode 100644 index 000000000000..9c7eb52b2680 --- /dev/null +++ b/kubernetes/core/pom.xml @@ -0,0 +1,101 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes_2.11 + jar + Spark Project Kubernetes + + kubernetes + 1.4.17 + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.netflix.feign + feign-core + + + com.netflix.feign + feign-okhttp + + + com.netflix.feign + feign-jackson + + + com.netflix.feign + feign-jaxrs + + + javax.ws.rs + jsr311-api + + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + javax.ws.rs + javax.ws.rs-api + + + + com.google.guava + guava + + + + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + + diff --git a/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 000000000000..55e7e38b28a0 --- /dev/null +++ b/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala new file mode 100644 index 000000000000..f402b6df82fc --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File +import java.security.SecureRandom +import java.util.concurrent.{Executors, TimeUnit} +import javax.net.ssl.X509TrustManager + +import com.google.common.io.Files +import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.SSLUtils +import org.apache.commons.codec.binary.Base64 +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt +import scala.util.Success + +import org.apache.spark.{SPARK_VERSION, SparkConf} +import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.rest.kubernetes._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class Client( + sparkConf: SparkConf, + mainClass: String, + mainAppResource: String, + appArgs: Array[String]) extends Logging { + import Client._ + + private val namespace = sparkConf.getOption("spark.kubernetes.namespace").getOrElse( + throw new IllegalArgumentException("Namespace must be provided in spark.kubernetes.namespace")) + private val master = sparkConf + .getOption("spark.kubernetes.master") + .getOrElse("Master must be provided in spark.kubernetes.master") + + private val launchTime = System.currentTimeMillis + private val kubernetesAppId = sparkConf.getOption("spark.app.name") + .orElse(sparkConf.getOption("spark.app.id")) + .getOrElse(s"spark-$launchTime") + + private val secretName = s"spark-submission-server-secret-$kubernetesAppId" + private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" + private val driverDockerImage = sparkConf.get( + "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") + private val uploadedDriverExtraClasspath = sparkConf + .getOption("spark.kubernetes.driver.uploads.driverExtraClasspath") + private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") + + private val secretBytes = new Array[Byte](128) + SECURE_RANDOM.nextBytes(secretBytes) + private val secretBase64String = Base64.encodeBase64String(secretBytes) + + private implicit val retryableExecutionContext = ExecutionContext + .fromExecutorService( + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("kubernetes-client-retryable-futures-%d") + .setDaemon(true) + .build())) + + def run(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master) + .withNamespace(namespace) + sparkConf.getOption("spark.kubernetes.submit.caCertFile").foreach { + f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) + } + sparkConf.getOption("spark.kubernetes.submit.clientKeyFile").foreach { + f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) + } + sparkConf.getOption("spark.kubernetes.submit.clientCertFile").foreach { + f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) + } + + val k8ClientConfig = k8ConfBuilder.build + Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig))(kubernetesClient => { + val secret = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(secretName) + .endMetadata() + .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) + .withType("Opaque") + .done() + try { + val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava + val uiPort = sparkConf + .getOption("spark.ui.port") + .map(_.toInt) + .getOrElse(DEFAULT_UI_PORT) + val (servicePorts, containerPorts) = configurePorts(uiPort) + val service = kubernetesClient.services().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .endMetadata() + .withNewSpec() + .withSelector(selectors) + .withPorts(servicePorts.asJava) + .endSpec() + .done() + sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName) + sparkConf.setIfMissing("spark.driver.port", DRIVER_PORT.toString) + sparkConf.setIfMissing("spark.blockmanager.port", BLOCKMANAGER_PORT.toString) + val submitRequest = buildSubmissionRequest() + val submitCompletedFuture = SettableFuture.create[Boolean] + val secretDirectory = s"/var/run/secrets/spark-submission/$kubernetesAppId" + + val podWatcher = new Watcher[Pod] { + override def eventReceived(action: Action, t: Pod): Unit = { + if ((action == Action.ADDED || action == Action.MODIFIED) + && t.getStatus.getPhase == "Running" + && !submitCompletedFuture.isDone) { + t.getStatus + .getContainerStatuses + .asScala + .find(status => + status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { + case Some(status) => + try { + val driverLauncher = getDriverLauncherService( + k8ClientConfig, master) + val ping = Retry.retry(5, 5.seconds) { + driverLauncher.ping() + } + ping onFailure { + case t: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(t) + } + } + val submitComplete = ping andThen { + case Success(_) => + driverLauncher.create(submitRequest) + submitCompletedFuture.set(true) + } + submitComplete onFailure { + case t: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(t) + } + } + } catch { + case e: Throwable => + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(e) + throw e + } + } + case None => + } + } + } + + override def onClose(e: KubernetesClientException): Unit = { + if (!submitCompletedFuture.isDone) { + submitCompletedFuture.setException(e) + } + } + } + + def createDriverPod(unused: Watch): Unit = { + kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .withRestartPolicy("OnFailure") + .addNewVolume() + .withName(s"spark-submission-secret-volume") + .withNewSecret() + .withSecretName(secret.getMetadata.getName) + .endSecret() + .endVolume + .addNewContainer() + .withName(DRIVER_LAUNCHER_CONTAINER_NAME) + .withImage(driverDockerImage) + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName("spark-submission-secret-volume") + .withMountPath(secretDirectory) + .withReadOnly(true) + .endVolumeMount() + .addNewEnv() + .withName("SPARK_SUBMISSION_SECRET_LOCATION") + .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME") + .endEnv() + .addNewEnv() + .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT") + .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString) + .endEnv() + .withPorts(containerPorts.asJava) + .endContainer() + .endSpec() + .done() + submitCompletedFuture.get(30, TimeUnit.SECONDS) + } + + Utils.tryWithResource(kubernetesClient + .pods() + .withLabels(selectors) + .watch(podWatcher)) { createDriverPod } + } finally { + kubernetesClient.secrets().delete(secret) + } + }) + } + + private def configurePorts(uiPort: Int): (Seq[ServicePort], Seq[ContainerPort]) = { + val servicePorts = new ArrayBuffer[ServicePort] + val containerPorts = new ArrayBuffer[ContainerPort] + + def addPortToServiceAndContainer(portName: String, portValue: Int): Unit = { + servicePorts += new ServicePortBuilder() + .withName(portName) + .withPort(portValue) + .withNewTargetPort(portValue) + .build() + containerPorts += new ContainerPortBuilder() + .withContainerPort(portValue) + .build() + } + + addPortToServiceAndContainer( + DRIVER_LAUNCHER_SERVICE_PORT_NAME, + DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + addPortToServiceAndContainer( + DRIVER_PORT_NAME, + sparkConf + .getOption("spark.driver.port") + .map(_.toInt) + .getOrElse(DRIVER_PORT)) + addPortToServiceAndContainer( + BLOCKMANAGER_PORT_NAME, + sparkConf + .getOption("spark.blockmanager.port") + .map(_.toInt) + .getOrElse(BLOCKMANAGER_PORT)) + + addPortToServiceAndContainer(UI_PORT_NAME, uiPort) + (servicePorts.toSeq, containerPorts.toSeq) + } + + private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { + val appResourceUri = Utils.resolveURI(mainAppResource) + val resolvedAppResource: AppResource = appResourceUri.getScheme match { + case "file" | null => + val appFile = new File(appResourceUri.getPath) + if (!appFile.isFile) { + throw new IllegalStateException("Provided local file path does not exist" + + s" or is not a file: ${appFile.getAbsolutePath}") + } + val fileBytes = Files.toByteArray(appFile) + val fileBase64 = Base64.encodeBase64String(fileBytes) + UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) + case other => RemoteAppResource(other) + } + + val uploadDriverExtraClasspathBase64Contents = getFileContents(uploadedDriverExtraClasspath) + val uploadJarsBase64Contents = getFileContents(uploadedJars) + KubernetesCreateSubmissionRequest( + appResource = resolvedAppResource, + mainClass = mainClass, + appArgs = appArgs, + secret = secretBase64String, + sparkProperties = sparkConf.getAll.toMap, + uploadedDriverExtraClasspathBase64Contents = uploadDriverExtraClasspathBase64Contents, + uploadedJarsBase64Contents = uploadJarsBase64Contents) + } + + def getFileContents(maybeFilePaths: Option[String]): Array[(String, String)] = { + maybeFilePaths + .map(_.split(",").map(filePath => { + val fileToUpload = new File(filePath) + if (!fileToUpload.isFile) { + throw new IllegalStateException("Provided file to upload for driver extra classpath" + + s" does not exist or is not a file: $filePath") + } else { + val fileBytes = Files.toByteArray(fileToUpload) + val fileBase64 = Base64.encodeBase64String(fileBytes) + (fileToUpload.getName, fileBase64) + } + })).getOrElse(Array.empty[(String, String)]) + } + + private def getDriverLauncherService( + k8ClientConfig: Config, + kubernetesMaster: String): KubernetesSparkRestApi = { + val url = s"${ + Array[String]( + kubernetesMaster, + "api", "v1", "proxy", + "namespaces", namespace, + "services", kubernetesAppId).mkString("/")}" + + s":$DRIVER_LAUNCHER_SERVICE_PORT_NAME/" + + val sslContext = SSLUtils.sslContext(k8ClientConfig) + val trustManager = SSLUtils.trustManagers( + k8ClientConfig)(0).asInstanceOf[X509TrustManager] + HttpClientUtil.createClient[KubernetesSparkRestApi]( + uri = url, + sslSocketFactory = sslContext.getSocketFactory, + trustContext = trustManager) + } +} + +private object Client { + + private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret" + private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" + private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077 + private val DRIVER_PORT = 7078 + private val BLOCKMANAGER_PORT = 7079 + private val DEFAULT_UI_PORT = 4040 + private val UI_PORT_NAME = "spark-ui-port" + private val DRIVER_LAUNCHER_SERVICE_PORT_NAME = "driver-launcher-port" + private val DRIVER_PORT_NAME = "driver-port" + private val BLOCKMANAGER_PORT_NAME = "block-manager-port" + private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher" + private val SECURE_RANDOM = new SecureRandom() + + def main(args: Array[String]): Unit = { + require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " + + s" []") + val mainAppResource = args(0) + val mainClass = args(1) + val appArgs = args.drop(2) + val sparkConf = new SparkConf(true) + new Client( + mainAppResource = mainAppResource, + mainClass = mainClass, + sparkConf = sparkConf, + appArgs = appArgs).run() + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala new file mode 100644 index 000000000000..4c715c86cc7f --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +private[spark] object KubernetesClientBuilder { + private val API_SERVER_TOKEN = new File("/var/run/secrets/kubernetes.io/serviceaccount/token") + private val CA_CERT_FILE = new File("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") + + /** + * Creates a {@link KubernetesClient}, expecting to be from + * within the context of a pod. When doing so, credentials files + * are picked up from canonical locations, as they are injected + * into the pod's disk space. + */ + def buildFromWithinPod( + kubernetesMaster: String, + kubernetesNamespace: String): DefaultKubernetesClient = { + var clientConfigBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withNamespace(kubernetesNamespace) + + if (CA_CERT_FILE.isFile) { + clientConfigBuilder = clientConfigBuilder.withCaCertFile(CA_CERT_FILE.getAbsolutePath) + } + + if (API_SERVER_TOKEN.isFile) { + clientConfigBuilder = clientConfigBuilder.withOauthToken( + Files.toString(API_SERVER_TOKEN, Charsets.UTF_8)) + } + new DefaultKubernetesClient(clientConfigBuilder.build) + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala new file mode 100644 index 000000000000..e5ce0bcd606b --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Retry.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +private[spark] object Retry { + + private def retryableFuture[T] + (times: Int, interval: Duration) + (f: => Future[T]) + (implicit executionContext: ExecutionContext): Future[T] = { + f recoverWith { + case _ if times > 0 => { + Thread.sleep(interval.toMillis) + retryableFuture(times - 1, interval)(f) + } + } + } + + def retry[T] + (times: Int, interval: Duration) + (f: => T) + (implicit executionContext: ExecutionContext): Future[T] = { + retryableFuture(times, interval)(Future[T] { f }) + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala new file mode 100644 index 000000000000..4b7bb66083f2 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest + +import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} + +import org.apache.spark.SPARK_VERSION + +// TODO: jars should probably be compressed. Shipping tarballs would be optimal. +case class KubernetesCreateSubmissionRequest( + val appResource: AppResource, + val mainClass: String, + val appArgs: Array[String], + val sparkProperties: Map[String, String], + val secret: String, + val uploadedDriverExtraClasspathBase64Contents: Array[(String, String)] + = Array.empty[(String, String)], + val uploadedJarsBase64Contents: Array[(String, String)] + = Array.empty[(String, String)]) extends SubmitRestProtocolRequest { + message = "create" + clientSparkVersion = SPARK_VERSION +} + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +@JsonSubTypes(value = Array( + new JsonSubTypes.Type(value = classOf[UploadedAppResource], name = "UploadedAppResource"), + new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource"))) +abstract class AppResource + +case class UploadedAppResource( + resourceBase64Contents: String, + name: String = "spark-app-resource") extends AppResource + +case class RemoteAppResource(resource: String) extends AppResource + +class PingResponse extends SubmitRestProtocolResponse { + val text = "pong" + message = "pong" + serverSparkVersion = SPARK_VERSION +} + diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala new file mode 100644 index 000000000000..eb7d41170082 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import javax.net.ssl.{SSLContext, SSLSocketFactory, X509TrustManager} + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import feign.Feign +import feign.Request.Options +import feign.jackson.{JacksonDecoder, JacksonEncoder} +import feign.jaxrs.JAXRSContract +import okhttp3.OkHttpClient +import scala.reflect.ClassTag + +import org.apache.spark.status.api.v1.JacksonMessageWriter + +private[spark] object HttpClientUtil { + + def createClient[T: ClassTag]( + uri: String, + sslSocketFactory: SSLSocketFactory = SSLContext.getDefault.getSocketFactory, + trustContext: X509TrustManager = null, + readTimeoutMillis: Int = 20000, + connectTimeoutMillis: Int = 20000): T = { + var httpClientBuilder = new OkHttpClient.Builder() + Option.apply(trustContext).foreach(context => { + httpClientBuilder = httpClientBuilder.sslSocketFactory(sslSocketFactory, context) + }) + val objectMapper = new ObjectMapper() + .registerModule(new DefaultScalaModule) + .setDateFormat(JacksonMessageWriter.makeISODateFormat) + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + Feign.builder() + .client(new feign.okhttp.OkHttpClient(httpClientBuilder.build())) + .contract(new JAXRSContract) + .encoder(new JacksonEncoder(objectMapper)) + .decoder(new JacksonDecoder(objectMapper)) + .options(new Options(connectTimeoutMillis, readTimeoutMillis)) + .target(clazz, uri) + } +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala new file mode 100644 index 000000000000..3cbcb16293b1 --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import javax.ws.rs.{Consumes, GET, Path, POST, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KubernetesCreateSubmissionRequest, PingResponse} + +@Path("/v1/submissions/") +trait KubernetesSparkRestApi { + + @POST + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/create") + def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse + + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/ping") + def ping(): PingResponse + +} diff --git a/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala new file mode 100644 index 000000000000..0a2e8176394a --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.io.File +import java.net.URI +import java.nio.file.Paths +import java.util.concurrent.CountDownLatch +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + +import com.google.common.io.Files +import org.apache.commons.codec.binary.Base64 +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.rest._ +import org.apache.spark.util.{ShutdownHookManager, Utils} + +private case class KubernetesSparkRestServerArguments( + val host: Option[String] = None, + val port: Option[Int] = None, + val secretFile: Option[String] = None) { + def validate(): KubernetesSparkRestServerArguments = { + require(host.isDefined, "Hostname not set via --hostname.") + require(port.isDefined, "Port not set via --port") + require(secretFile.isDefined, "Secret file not set via --secret-file") + this + } +} + +private object KubernetesSparkRestServerArguments { + def fromArgsArray(inputArgs: Array[String]): KubernetesSparkRestServerArguments = { + var args = inputArgs.toList + var resolvedArguments = KubernetesSparkRestServerArguments() + while (args.nonEmpty) { + resolvedArguments = args match { + case "--hostname" :: value :: tail => + args = tail + resolvedArguments.copy(host = Some(value)) + case "--port" :: value :: tail => + args = tail + resolvedArguments.copy(port = Some(value.toInt)) + case "--secret-file" :: value :: tail => + args = tail + resolvedArguments.copy(secretFile = Some(value)) + // TODO polish usage message + case Nil => resolvedArguments + case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown") + } + } + resolvedArguments.validate() + } +} + +private[spark] class KubernetesSparkRestServer( + host: String, + port: Int, + conf: SparkConf, + expectedApplicationSecret: Array[Byte]) + extends RestSubmissionServer(host, port, conf) { + + private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java" + private val sparkHome = System.getenv("SPARK_HOME") + private val securityManager = new SecurityManager(conf) + override protected lazy val contextToServlet = Map[String, RestServlet]( + s"$baseContext/create/*" -> submitRequestServlet, + s"$baseContext/ping/*" -> pingServlet) + + private val pingServlet = new PingServlet + override protected val submitRequestServlet: SubmitRequestServlet + = new KubernetesSubmitRequestServlet + // TODO + override protected val statusRequestServlet: StatusRequestServlet = null + override protected val killRequestServlet: KillRequestServlet = null + + private class PingServlet extends RestServlet { + protected override def doGet( + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + sendResponse(new PingResponse, response) + } + } + + private class KubernetesSubmitRequestServlet extends SubmitRequestServlet { + + // TODO validating the secret should be done as part of a header of the request. + // Instead here we have to specify the secret in the body. + override protected def handleSubmit( + requestMessageJson: String, + requestMessage: SubmitRestProtocolMessage, + responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { + requestMessage match { + case KubernetesCreateSubmissionRequest( + appResource, + mainClass, + appArgs, + sparkProperties, + secret, + uploadedDriverExtraClasspath, + uploadedJars) => + val decodedSecret = Base64.decodeBase64(secret) + if (!expectedApplicationSecret.sameElements(decodedSecret)) { + responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED) + handleError("Unauthorized to submit application.") + } else { + val tempDir = Utils.createTempDir() + val appResourcePath = resolvedAppResource(appResource, tempDir) + val driverClasspathDirectory = new File(tempDir, "driver-extra-classpath") + if (!driverClasspathDirectory.mkdir) { + throw new IllegalStateException("Failed to create driver extra classpath" + + s" dir at ${driverClasspathDirectory.getAbsolutePath}") + } + val jarsDirectory = new File(tempDir, "jars") + if (!jarsDirectory.mkdir) { + throw new IllegalStateException("Failed to create jars dir at" + + s"${jarsDirectory.getAbsolutePath}") + } + val writtenDriverExtraClasspath = writeBase64ContentsToFiles( + uploadedDriverExtraClasspath, driverClasspathDirectory) + val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory) + val originalDriverExtraClasspath = sparkProperties.get("spark.driver.extraClassPath") + .map(_.split(",")) + .getOrElse(Array.empty[String]) + val resolvedDriverExtraClasspath = writtenDriverExtraClasspath ++ + originalDriverExtraClasspath + val originalJars = sparkProperties.get("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty[String]) + val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) + val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) + val driverClasspath = resolvedDriverExtraClasspath ++ + resolvedJars ++ + sparkJars ++ + Array(appResourcePath) + val resolvedSparkProperties = new mutable.HashMap[String, String] + resolvedSparkProperties ++= sparkProperties + resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") + + val command = new ArrayBuffer[String] + command += javaExecutable + command += "-cp" + command += s"${driverClasspath.mkString(":")}" + for (prop <- resolvedSparkProperties) { + command += s"-D${prop._1}=${prop._2}" + } + val driverMemory = resolvedSparkProperties.getOrElse("spark.driver.memory", "1g") + command += s"-Xms$driverMemory" + command += s"-Xmx$driverMemory" + command += mainClass + command ++= appArgs + val pb = new ProcessBuilder(command: _*) + Paths.get(sparkHome, "logs").toFile.mkdirs + pb.redirectOutput(Paths.get(sparkHome, "logs", "stdout").toFile) + pb.redirectError(Paths.get(sparkHome, "logs", "stderr").toFile) + val process = pb.start() + ShutdownHookManager.addShutdownHook(() => { + logInfo("Received stop command, shutting down the running Spark application...") + process.destroy() + }) + val response = new CreateSubmissionResponse + response.success = true + response.submissionId = null + response.message = "success" + response.serverSparkVersion = SPARK_VERSION + response + } + case unexpected => + responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) + handleError(s"Received message of unexpected type ${unexpected.messageType}.") + } + } + + def resolvedAppResource(appResource: AppResource, tempDir: File): String = { + val appResourcePath = appResource match { + case UploadedAppResource(resourceContentsBase64, resourceName) => + val resourceFile = new File(tempDir, resourceName) + val resourceFilePath = resourceFile.getAbsolutePath + if (resourceFile.createNewFile()) { + val resourceContentsBytes = Base64.decodeBase64(resourceContentsBase64) + Files.write(resourceContentsBytes, resourceFile) + resourceFile.getAbsolutePath + } else { + throw new IllegalStateException(s"Failed to write main app resource file" + + s" to $resourceFilePath") + } + case RemoteAppResource(resource) => + Utils.fetchFile(resource, tempDir, conf, + securityManager, SparkHadoopUtil.get.newConfiguration(conf), + System.currentTimeMillis(), useCache = false) + val fileName = Utils.decodeFileNameInURI(URI.create(resource)) + val downloadedFile = new File(tempDir, fileName) + val downloadedFilePath = downloadedFile.getAbsolutePath + if (!downloadedFile.isFile) { + throw new IllegalStateException(s"Main app resource is not a file or" + + s" does not exist at $downloadedFilePath") + } + downloadedFilePath + } + appResourcePath + } + } + + private def writeBase64ContentsToFiles( + filesBase64Contents: Array[(String, String)], + rootDir: File): Seq[String] = { + val resolvedFileNames = new scala.collection.mutable.HashSet[String] + val resolvedFilePaths = new ArrayBuffer[String] + for (file <- filesBase64Contents) { + var currentFileName = file._1 + var deduplicationCounter = 1 + while (resolvedFileNames.contains(currentFileName)) { + // Prepend the deduplication counter so as to not mess with the extension + currentFileName = s"$deduplicationCounter-$currentFileName" + deduplicationCounter += 1 + } + val resolvedFile = new File(rootDir, currentFileName) + val resolvedFilePath = resolvedFile.getAbsolutePath + if (resolvedFile.createNewFile()) { + val fileContents = Base64.decodeBase64(file._2) + Files.write(fileContents, resolvedFile) + } else { + throw new IllegalStateException(s"Could not write jar file to $resolvedFilePath") + } + resolvedFileNames += currentFileName + resolvedFilePaths += resolvedFilePath + } + resolvedFilePaths.toSeq + } +} + +private[spark] object KubernetesSparkRestServer { + private val barrier = new CountDownLatch(1) + def main(args: Array[String]): Unit = { + val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args) + val secretFile = new File(parsedArguments.secretFile.get) + if (!secretFile.isFile) { + throw new IllegalArgumentException(s"Secret file specified by --secret-file" + + " is not a file, or does not exist.") + } + val secretBytes = Files.toByteArray(secretFile) + val sparkConf = new SparkConf(true) + val server = new KubernetesSparkRestServer( + parsedArguments.host.get, + parsedArguments.port.get, + sparkConf, + secretBytes) + server.start() + ShutdownHookManager.addShutdownHook(() => { + try { + server.stop() + } finally { + barrier.countDown() + } + }) + barrier.await() + } +} + diff --git a/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala new file mode 100644 index 000000000000..0d3b97c636ca --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +private[spark] class KubernetesClusterManager extends ExternalClusterManager { + + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("kubernetes") + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + val scheduler = new TaskSchedulerImpl(sc) + sc.taskScheduler = scheduler + scheduler + } + + override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) + : SchedulerBackend = { + new KubernetesClusterSchedulerBackend(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } + +} + diff --git a/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala new file mode 100644 index 000000000000..f37b97e4dd0d --- /dev/null +++ b/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import java.util.UUID +import java.util.concurrent.Executors +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, EnvVar, EnvVarBuilder, Pod, QuantityBuilder} +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder +import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + val sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_MODIFICATION_LOCK = new Object + private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] + + private val kubernetesMaster = conf + .getOption("spark.kubernetes.master") + .getOrElse( + throw new SparkException("Kubernetes master must be specified in kubernetes mode.")) + + private val executorDockerImage = conf + .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}") + + private val kubernetesNamespace = conf + .getOption("spark.kubernetes.namespace") + .getOrElse( + throw new SparkException("Kubernetes namespace must be specified in kubernetes mode.")) + + private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + + private val blockmanagerPort = conf + .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val kubernetesDriverServiceName = conf + .getOption("spark.kubernetes.driver.service.name") + .getOrElse( + throw new SparkException("Must specify the service name the driver is running with")) + + private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g") + private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) + + private val memoryOverheadBytes = conf + .getOption("spark.kubernetes.executor.memoryOverhead") + .map(overhead => Utils.byteStringAsBytes(overhead)) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, + MEMORY_OVERHEAD_MIN)) + private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes + + private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") + + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("kubernetes-executor-requests-%d") + .build)) + + private val kubernetesClient = KubernetesClientBuilder + .buildFromWithinPod(kubernetesMaster, kubernetesNamespace) + + override val minRegisteredRatio = + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } + + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"), + sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private def convertToEnvMode(value: String): String = + value.toUpperCase.map { c => if (c == '-') '_' else c } + + private val initialExecutors = getInitialTargetExecutorNumber(1) + + private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) + val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 1) + require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number " + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + conf.getInt("spark.executor.instances", defaultNumExecutors) + } + } + + override def sufficientResourcesRegistered(): Boolean = { + totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio + } + + override def start(): Unit = { + super.start() + if (!Utils.isDynamicAllocationEnabled(sc.conf)) { + doRequestTotalExecutors(initialExecutors) + } + } + + override def stop(): Unit = { + // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. + // When using Utils.tryLogNonFatalError some of the code fails but without any logs or + // indication as to why. + try { + runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) + } catch { + case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) + } + try { + kubernetesClient.services().withName(kubernetesDriverServiceName).delete() + } catch { + case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) + } + try { + kubernetesClient.close() + } catch { + case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) + } + super.stop() + } + + private def allocateNewExecutorPod(): (String, Pod) = { + val executorKubernetesId = UUID.randomUUID().toString.replaceAll("-", "") + val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString + val name = s"exec$executorKubernetesId" + val selectors = Map(SPARK_EXECUTOR_SELECTOR -> executorId, + SPARK_APP_SELECTOR -> applicationId()).asJava + val executorMemoryQuantity = new QuantityBuilder(false) + .withAmount(executorMemoryBytes.toString) + .build() + val executorMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(executorMemoryWithOverhead.toString) + .build() + val requiredEnv = new ArrayBuffer[EnvVar] + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_PORT") + .withValue(executorPort.toString) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_DRIVER_URL") + .withValue(driverUrl) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_CORES") + .withValue(executorCores) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_MEMORY") + .withValue(executorMemory) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_APPLICATION_ID") + .withValue(applicationId()) + .build() + requiredEnv += new EnvVarBuilder() + .withName("SPARK_EXECUTOR_ID") + .withValue(executorId) + .build() + val requiredPorts = new ArrayBuffer[ContainerPort] + requiredPorts += new ContainerPortBuilder() + .withName(EXECUTOR_PORT_NAME) + .withContainerPort(executorPort) + .build() + requiredPorts += new ContainerPortBuilder() + .withName(BLOCK_MANAGER_PORT_NAME) + .withContainerPort(blockmanagerPort) + .build() + (executorKubernetesId, kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(name) + .withLabels(selectors) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName(s"exec-${applicationId()}-container") + .withImage(executorDockerImage) + .withImagePullPolicy("IfNotPresent") + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .endResources() + .withEnv(requiredEnv.asJava) + .withPorts(requiredPorts.asJava) + .endContainer() + .endSpec() + .done()) + } + + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { + EXECUTOR_MODIFICATION_LOCK.synchronized { + if (requestedTotal > totalExpectedExecutors.get) { + logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + + s" additional executors, expecting total $requestedTotal and currently" + + s" expected ${totalExpectedExecutors.get}") + for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { + runningExecutorPods += allocateNewExecutorPod() + } + } + totalExpectedExecutors.set(requestedTotal) + } + true + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { + EXECUTOR_MODIFICATION_LOCK.synchronized { + for (executor <- executorIds) { + runningExecutorPods.remove(executor) match { + case Some(pod) => kubernetesClient.pods().delete(pod) + case None => logWarning(s"Unable to remove pod for unknown executor $executor") + } + } + } + true + } +} + +private object KubernetesClusterSchedulerBackend { + private val SPARK_EXECUTOR_SELECTOR = "spark-exec" + private val SPARK_APP_SELECTOR = "spark-app" + private val DEFAULT_STATIC_PORT = 10000 + private val DEFAULT_BLOCKMANAGER_PORT = 7079 + private val DEFAULT_DRIVER_PORT = 7078 + private val BLOCK_MANAGER_PORT_NAME = "blockmanager" + private val EXECUTOR_PORT_NAME = "executor" + private val MEMORY_OVERHEAD_FACTOR = 0.10 + private val MEMORY_OVERHEAD_MIN = 384L + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) +} diff --git a/kubernetes/docker-minimal-bundle/pom.xml b/kubernetes/docker-minimal-bundle/pom.xml new file mode 100644 index 000000000000..3de939ea3978 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/pom.xml @@ -0,0 +1,137 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../pom.xml + + + spark-docker-minimal-bundle_2.11 + Spark Project Docker Minimal Bundle + http://spark.apache.org/ + pom + + + docker-minimal-bundle + none + pre-integration-test + + + + + org.apache.spark + spark-assembly_${scala.binary.version} + ${project.version} + pom + + + + com.google.guava + guava + ${hadoop.deps.scope} + + + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + driver-docker-dist + pre-integration-test + + single + + + + src/main/assembly/driver-assembly.xml + + posix + + + + executor-docker-dist + pre-integration-test + + single + + + + src/main/assembly/executor-assembly.xml + + posix + + + + + + + + + + + hive + + + org.apache.spark + spark-hive_${scala.binary.version} + ${project.version} + + + + + hive-thriftserver + + + org.apache.spark + spark-hive-thriftserver_${scala.binary.version} + ${project.version} + + + + + spark-ganglia-lgpl + + + org.apache.spark + spark-ganglia-lgpl_${scala.binary.version} + ${project.version} + + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml b/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml new file mode 100644 index 000000000000..145244f34d1d --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/assembly/driver-assembly.xml @@ -0,0 +1,84 @@ + + + driver-docker-dist + + tar.gz + dir + + false + + + + ${project.parent.basedir}/core/src/main/resources/org/apache/spark/ui/static/ + + ui-resources/org/apache/spark/ui/static + + **/* + + + + + ${project.parent.basedir}/sbin/ + + sbin + + **/* + + + + + ${project.parent.basedir}/bin/ + + bin + + **/* + + + + + ${project.parent.basedir}/conf/ + + conf + + **/* + + + + + src/main/docker/driver + + + + **/* + + + + + + jars + true + false + runtime + false + + org.apache.spark:spark-assembly_${scala.binary.version}:pom + org.spark-project.spark:unused + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml b/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml new file mode 100644 index 000000000000..d97ba56562a1 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/assembly/executor-assembly.xml @@ -0,0 +1,84 @@ + + + executor-docker-dist + + tar.gz + dir + + false + + + + ${project.parent.basedir}/core/src/main/resources/org/apache/spark/ui/static/ + + ui-resources/org/apache/spark/ui/static + + **/* + + + + + ${project.parent.basedir}/sbin/ + + sbin + + **/* + + + + + ${project.parent.basedir}/bin/ + + bin + + **/* + + + + + ${project.parent.basedir}/conf/ + + conf + + **/* + + + + + src/main/docker/executor + + + + **/* + + + + + + jars + true + false + runtime + false + + org.apache.spark:spark-assembly_${scala.binary.version}:pom + org.spark-project.spark:unused + + + + diff --git a/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile new file mode 100644 index 000000000000..3bba38d8395a --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -0,0 +1,26 @@ +FROM ubuntu:trusty + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server procps && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/spark +RUN mkdir -p /opt/spark/ui-resources/org/apache/spark/ui/static +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64/jre + +WORKDIR /opt/spark + +# This class will also require setting a secret via the SPARK_APP_SECRET environment variable +CMD exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer --hostname $HOSTNAME --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT --secret-file $SPARK_SUBMISSION_SECRET_LOCATION diff --git a/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile new file mode 100644 index 000000000000..f68f1a3fb269 --- /dev/null +++ b/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -0,0 +1,26 @@ +FROM ubuntu:trusty + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server procps && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/spark +RUN mkdir -p /opt/spark/ui-resources/org/apache/spark/ui/static +RUN touch /opt/spark/RELEASE + +ADD jars /opt/spark/jars +ADD bin /opt/spark/bin +ADD sbin /opt/spark/sbin +ADD conf /opt/spark/conf + +ENV SPARK_HOME /opt/spark +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64/jre + +WORKDIR /opt/spark + +# TODO support spark.executor.extraClassPath +CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME diff --git a/kubernetes/integration-tests-spark-jobs/pom.xml b/kubernetes/integration-tests-spark-jobs/pom.xml new file mode 100644 index 000000000000..17f1c4906214 --- /dev/null +++ b/kubernetes/integration-tests-spark-jobs/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes-integration-tests-spark-jobs_2.11 + jar + Spark Project Kubernetes Integration Tests Spark Jobs + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + provided + + + diff --git a/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala b/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala new file mode 100644 index 000000000000..6e4660b77130 --- /dev/null +++ b/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.jobs + +import scala.math.random + +import org.apache.spark.sql.SparkSession + +// Equivalent to SparkPi except does not stop the Spark Context +// at the end and spins forever, so other things can inspect the +// Spark UI immediately after the fact. +private[spark] object SparkPiWithInfiniteWait { + + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder + .appName("Spark Pi") + .getOrCreate() + val slices = if (args.length > 0) args(0).toInt else 10 + val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow + val count = spark.sparkContext.parallelize(1 until n, slices).map { i => + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) 1 else 0 + }.reduce(_ + _) + // scalastyle:off println + println("Pi is roughly " + 4.0 * count / (n - 1)) + // scalastyle:on println + + // Spin forever to keep the Spark UI active, so other things can inspect the job. + while (true) { + Thread.sleep(600000) + } + } + +} diff --git a/kubernetes/integration-tests/pom.xml b/kubernetes/integration-tests/pom.xml new file mode 100644 index 000000000000..0568cb1e2182 --- /dev/null +++ b/kubernetes/integration-tests/pom.xml @@ -0,0 +1,206 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent_2.11 + 2.1.0-SNAPSHOT + ../../pom.xml + + + spark-kubernetes-integration-tests_2.11 + jar + Spark Project Kubernetes Integration Tests + + + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version} + ${project.version} + test + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + tar.gz + driver-docker-dist + test + + + * + * + + + + + com.google.guava + guava + test + + 18.0 + + + com.spotify + docker-client + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.core + jersey-common + + + javax.ws.rs + jsr311-api + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-test-spark-jobs + pre-integration-test + + copy + + + + + org.apache.spark + spark-kubernetes-integration-tests-spark-jobs_${scala.binary.version} + ${project.version} + jar + ${project.build.directory}/integration-tests-spark-jobs + + + + + + unpack-docker-driver-bundle + pre-integration-test + + unpack + + + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + driver-docker-dist + tar.gz + true + ${project.build.directory}/docker/driver + + + + + + unpack-docker-executor-bundle + pre-integration-test + + unpack + + + + + org.apache.spark + spark-docker-minimal-bundle_${scala.binary.version} + ${project.version} + executor-docker-dist + tar.gz + true + ${project.build.directory}/docker/executor + + + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.3.0 + + + download-minikube-linux + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-linux-amd64 + ${project.build.directory}/minikube-bin/linux-amd64 + minikube + + + + download-minikube-darwin + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-darwin-amd64 + ${project.build.directory}/minikube-bin/darwin-amd64 + minikube + + + + + + + + + diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala new file mode 100644 index 000000000000..3f3d2e609ea4 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.nio.file.Paths +import java.util.UUID + +import com.google.common.collect.ImmutableList +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkSubmit +import org.apache.spark.deploy.kubernetes.Client +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 +import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} + +private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { + + private val EXAMPLES_JAR = Paths.get("target", "integration-tests-spark-jobs") + .toFile + .listFiles()(0) + .getAbsolutePath + + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + ".integrationtest.jobs.SparkPiWithInfiniteWait" + private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") + private var minikubeKubernetesClient: KubernetesClient = _ + private var clientConfig: Config = _ + + override def beforeAll(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + Minikube.getKubernetesClient.namespaces.createNew() + .withNewMetadata() + .withName(NAMESPACE) + .endMetadata() + .done() + minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE) + clientConfig = minikubeKubernetesClient.getConfiguration + } + + before { + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(minikubeKubernetesClient.pods().list().getItems.isEmpty) + assert(minikubeKubernetesClient.services().list().getItems.isEmpty) + } + } + + after { + val pods = minikubeKubernetesClient.pods().list().getItems.asScala + pods.par.foreach(pod => { + minikubeKubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .withGracePeriod(60) + .delete + }) + } + + override def afterAll(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + private def expectationsForStaticAllocation(sparkMetricsService: SparkRestApiV1): Unit = { + val apps = Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService + .getApplications(ImmutableList.of(ApplicationStatus.RUNNING, ApplicationStatus.COMPLETED)) + assert(result.size == 1 + && !result.head.id.equalsIgnoreCase("appid") + && !result.head.id.equalsIgnoreCase("{appId}")) + result + } + Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getExecutors(apps.head.id) + assert(result.size == 2) + assert(result.count(exec => exec.id != "driver") == 1) + result + } + Eventually.eventually(TIMEOUT, INTERVAL) { + val result = sparkMetricsService.getStages( + apps.head.id, Seq(StageStatus.COMPLETE).asJava) + assert(result.size == 1) + result + } + } + + test("Run a simple example") { + val sparkConf = new SparkConf(true) + .setMaster("kubernetes") + .set("spark.kubernetes.master", s"https://${Minikube.getMinikubeIp}:8443") + .set("spark.kubernetes.submit.caCertFile", clientConfig.getCaCertFile) + .set("spark.kubernetes.submit.clientKeyFile", clientConfig.getClientKeyFile) + .set("spark.kubernetes.submit.clientCertFile", clientConfig.getClientCertFile) + .set("spark.kubernetes.namespace", NAMESPACE) + .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") + .set("spark.kubernetes.executor.docker.image", "spark-executor:latest") + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.id", "spark-pi") + val mainAppResource = s"file://$EXAMPLES_JAR" + + new Client( + sparkConf = sparkConf, + mainClass = MAIN_CLASS, + mainAppResource = mainAppResource, + appArgs = Array.empty[String]).run() + val sparkMetricsService = Minikube.getService[SparkRestApiV1]( + "spark-pi", NAMESPACE, "spark-ui-port") + expectationsForStaticAllocation(sparkMetricsService) + } + + test("Run using spark-submit") { + val args = Array( + "--master", "kubernetes", + "--deploy-mode", "cluster", + "--kubernetes-master", s"https://${Minikube.getMinikubeIp}:8443", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--class", MAIN_CLASS, + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + EXAMPLES_JAR) + SparkSubmit.main(args) + val sparkMetricsService = Minikube.getService[SparkRestApiV1]( + "spark-pi", NAMESPACE, "spark-ui-port") + expectationsForStaticAllocation(sparkMetricsService) + } +} diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala new file mode 100644 index 000000000000..22d78142508c --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.docker + +import java.net.URI +import java.nio.file.Paths + +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates} +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) { + + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates + .builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build().get()) + .build() + + def buildSparkDockerImages(): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + dockerClient.build(Paths.get("target", "docker", "driver"), "spark-driver") + dockerClient.build(Paths.get("target", "docker", "executor"), "spark-executor") + } + +} \ No newline at end of file diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala new file mode 100644 index 000000000000..60c6564579a6 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.minikube + +import java.io.{BufferedReader, InputStreamReader} +import java.nio.file.Paths +import java.util.concurrent.TimeUnit +import javax.net.ssl.X509TrustManager + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.internal.SSLUtils +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.deploy.rest.kubernetes.HttpClientUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +// TODO support windows +private[spark] object Minikube extends Logging { + private val MINIKUBE_EXECUTABLE_DEST = if (Utils.isMac) { + Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile + } else if (Utils.isWindows) { + throw new IllegalStateException("Executing Minikube based integration tests not yet " + + " available on Windows.") + } else { + Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile + } + + private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + + s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" + + private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + + def startMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.RUNNING) { + executeMinikube("start", "--memory", "6000", "--cpus", "8") + } else { + logInfo("Minikube is already started.") + } + } + + def getMinikubeIp: String = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val outputs = executeMinikube("ip") + assert(outputs.size == 1, "Unexpected amount of output from minikube ip") + outputs.head + } + + def getMinikubeStatus: MinikubeStatus.Value = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val statusString = executeMinikube("status").head.replaceFirst("minikubeVM: ", "") + MinikubeStatus.unapply(statusString) + .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } + + def getDockerEnv: Map[String, String] = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + executeMinikube("docker-env") + .filter(_.startsWith("export")) + .map(_.replaceFirst("export ", "").split('=')) + .map(arr => (arr(0), arr(1).replaceAllLiterally("\"", ""))) + .toMap + } + + def deleteMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) { + executeMinikube("delete") + } else { + logInfo("Minikube was already not running.") + } + } + + def getKubernetesClient: DefaultKubernetesClient = synchronized { + val kubernetesMaster = s"https://$getMinikubeIp:8443" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + new DefaultKubernetesClient(kubernetesConf) + } + + def getService[T: ClassTag]( + serviceName: String, + namespace: String, + servicePortName: String, + servicePath: String = ""): T = synchronized { + val kubernetesMaster = s"https://$getMinikubeIp:8443" + val url = s"${ + Array[String]( + kubernetesMaster, + "api", "v1", "proxy", + "namespaces", namespace, + "services", serviceName).mkString("/")}" + + s":$servicePortName$servicePath" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + val sslContext = SSLUtils.sslContext(kubernetesConf) + val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] + HttpClientUtil.createClient[T](url, sslContext.getSocketFactory, trustManager) + } + + def executeMinikubeSsh(command: String): Unit = { + executeMinikube("ssh", command) + } + + private def executeMinikube(action: String, args: String*): Seq[String] = { + if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { + if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { + throw new IllegalStateException("Failed to make the Minikube binary executable.") + } + } + val fullCommand = Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + + Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => + Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => + var line: String = null + do { + line = bufferedOutput.readLine() + if (line != null) { + logInfo(line) + outputLines += line + } + } while (line != null) + } + } + assert(proc.waitFor(MINIKUBE_STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS), + s"Timed out while executing $action on minikube.") + assert(proc.exitValue == 0, s"Failed to execute minikube $action ${args.mkString(" ")}") + outputLines.toSeq + } +} + +private[spark] object MinikubeStatus extends Enumeration { + + val RUNNING = status("Running") + val STOPPED = status("Stopped") + val DOES_NOT_EXIST = status("Does Not Exist") + val SAVED = status("Saved") + + def status(value: String): Value = new Val(nextId, value) + def unapply(s: String): Option[Value] = values.find(s == _.toString) +} diff --git a/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala new file mode 100644 index 000000000000..7a3b06b1b5e5 --- /dev/null +++ b/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/restapis/SparkRestApiV1.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.restapis + +import java.util.{List => JList} +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1._ + +@Path("/api/v1") +@Consumes(Array(MediaType.APPLICATION_JSON)) +@Produces(Array(MediaType.APPLICATION_JSON)) +trait SparkRestApiV1 { + + @GET + @Path("/applications") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getApplications( + @QueryParam("status") applicationStatuses: JList[ApplicationStatus]): Seq[ApplicationInfo] + + @GET + @Path("applications/{appId}/stages") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getStages( + @PathParam("appId") appId: String, + @QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] + + @GET + @Path("applications/{appId}/executors") + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + def getExecutors(@PathParam("appId") appId: String): Seq[ExecutorSummary] +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc507964..94f9bc319b6a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -76,6 +76,12 @@ class SparkSubmitOptionParser { protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; + // Kubernetes-only options. + protected final String KUBERNETES_MASTER = "--kubernetes-master"; + protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace"; + protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars"; + protected final String KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH = "--upload-driver-extra-classpath"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" @@ -115,6 +121,10 @@ class SparkSubmitOptionParser { { REPOSITORIES }, { STATUS }, { TOTAL_EXECUTOR_CORES }, + { KUBERNETES_MASTER }, + { KUBERNETES_NAMESPACE }, + { KUBERNETES_UPLOAD_JARS }, + { KUBERNETES_UPLOAD_DRIVER_EXTRA_CLASSPATH } }; /** diff --git a/pom.xml b/pom.xml index c391102d3750..78e2730d8f76 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,7 @@ 10.12.1.1 1.8.1 1.6.0 + 8.18.0 9.2.16.v20160414 3.1.0 0.8.0 @@ -303,6 +304,33 @@ chill-java ${chill.version} + + + com.netflix.feign + feign-core + ${feign.version} + + + com.netflix.feign + feign-okhttp + ${feign.version} + + + com.netflix.feign + feign-jackson + ${feign.version} + + + com.netflix.feign + feign-jaxrs + ${feign.version} + + + com.squareup.okhttp3 + okhttp + 3.4.1 + + @@ -617,6 +645,11 @@ jackson-module-jaxb-annotations ${fasterxml.jackson.version} + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + ${fasterxml.jackson.version} + org.glassfish.jersey.core jersey-server @@ -2592,6 +2625,22 @@ + + kubernetes + + kubernetes/core + + + + + kubernetes-integration-tests + + kubernetes/docker-minimal-bundle + kubernetes/integration-tests + kubernetes/integration-tests-spark-jobs + + + hive-thriftserver