Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Use a pre-installed Minikube instance for integration tests. #521

Open
wants to merge 11 commits into
base: branch-2.2-kubernetes
Choose a base branch
from
6 changes: 4 additions & 2 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -42,10 +42,12 @@ Below is a list of the submodules for this cluster manager and what they do.

# Running the Kubernetes Integration Tests

Note that the integration test framework is currently being heavily revised and is subject to change.

Note that currently the integration tests only run with Java 8.

Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.

Running any of the integration tests requires including `kubernetes-integration-tests` profile in the build command. In
order to prepare the environment for running the integration tests, the `pre-integration-test` step must be run in Maven
on the `resource-managers/kubernetes/integration-tests` module:
Original file line number Diff line number Diff line change
@@ -496,6 +496,9 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY =
"spark.kubernetes.test.imageDockerTag"

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
31 changes: 0 additions & 31 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -339,37 +339,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.3.0</version>
<executions>
<execution>
<id>download-minikube-linux</id>
<phase>pre-integration-test</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
</execution>
<execution>
<id>download-minikube-darwin</id>
<phase>pre-integration-test</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- Triggers scalatest plugin in the integration-test phase instead of
the test phase, so that test jobs are copied over beforehand.
Original file line number Diff line number Diff line change
@@ -32,8 +32,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
import org.apache.spark.deploy.k8s.SSLUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource, RMainAppResource}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
@@ -51,9 +50,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
kubernetesTestComponents
.kubernetesClient
.inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
staticAssetServerLauncher = new StaticAssetServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
kubernetesTestComponents
.kubernetesClient
.inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
}

override def afterAll(): Unit = {
@@ -62,8 +65,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {

before {
sparkConf = kubernetesTestComponents.newSparkConf()
.set(INIT_CONTAINER_DOCKER_IMAGE, s"spark-init:latest")
.set(DRIVER_DOCKER_IMAGE, s"spark-driver:latest")
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", APP_LOCATOR_LABEL)
kubernetesTestComponents.createNamespace()
}
@@ -73,14 +77,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run PySpark Job on file from SUBMITTER with --py-files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
sparkConf
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver-py"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor-py"))


runPySparkPiAndVerifyCompletion(
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION,
@@ -89,20 +92,18 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
sparkConf
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver-py"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor-py"))

runPySparkPiAndVerifyCompletion(PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION, Seq.empty[String])
}

test("Run SparkR Job on file locally") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
sparkConf
@@ -115,7 +116,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run SparkR Job on file from SUBMITTER") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
sparkConf
@@ -128,14 +129,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Simple submission test with the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
runSparkPiAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

val keyStoreAndTrustStore = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
@@ -162,14 +163,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use container-local resources without the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

test("Dynamic executor scaling basic test") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
createShuffleServiceDaemonSet()
@@ -190,7 +191,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use remote resources without the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
sparkConf.setJars(Seq(
s"$assetServerUri/${EXAMPLES_JAR_FILE.getName}",
@@ -200,7 +201,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Mix remote resources with submitted ones.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
sparkConf.setJars(Seq(
@@ -210,7 +211,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use key and certificate PEM files for TLS.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
val keyAndCertificate = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
launchStagingServer(
SSLOptions(enabled = true),
@@ -222,7 +223,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use client key and client cert file when requesting executors") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
sparkConf.setJars(Seq(
CONTAINER_LOCAL_MAIN_APP_RESOURCE,
CONTAINER_LOCAL_HELPER_JAR_PATH))
@@ -239,7 +240,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Added files should be placed in the driver's working directory.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
@@ -257,7 +258,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Setting JVM options on the driver and executors with spaces.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
Map("simpleDriverConf" -> "simpleDriverConfValue",
@@ -287,7 +288,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Submit small local files without the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
@@ -305,15 +306,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use a very long application name.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)).setAppName("long" * 40)
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

private def launchStagingServer(
resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions, keyAndCertPem)
@@ -405,7 +406,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.endVolume()
.addNewContainer()
.withName("shuffle")
.withImage("spark-shuffle:latest")
.withImage(s"spark-shuffle:${testBackend.dockerImageTag()}")
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName("shuffle-dir")
@@ -441,6 +442,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}
propertiesFile
}

private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
}

private[spark] object KubernetesSuite {
Original file line number Diff line number Diff line change
@@ -32,7 +32,8 @@ import org.apache.spark.util.Utils
/**
* Launches a pod that runs the resource staging server, exposing it over a NodePort.
*/
private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesClient) {
private[spark] class ResourceStagingServerLauncher(
kubernetesClient: KubernetesClient, dockerImageTag: String) {

private val SECRETS_ROOT_DIR = "/mnt/secrets/spark-staging"
private val KEYSTORE_SECRET_KEY = "keyStore"
@@ -123,7 +124,7 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC
.endVolume()
.addNewContainer()
.withName("staging-server-container")
.withImage("spark-resource-staging-server:latest")
.withImage(s"spark-resource-staging-server:$dockerImageTag")
.withImagePullPolicy("IfNotPresent")
.withNewReadinessProbe()
.withHttpGet(probePingHttpGet)
Original file line number Diff line number Diff line change
@@ -25,7 +25,8 @@ import org.apache.spark.util.Utils
* Launches a simple HTTP server which provides jars that can be downloaded by Spark applications
* in integration tests.
*/
private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClient) {
private[spark] class StaticAssetServerLauncher(
kubernetesClient: KubernetesClient, dockerImageTag: String) {

// Returns the HTTP Base URI of the server.
def launchStaticAssetServer(): String = {
@@ -46,7 +47,7 @@ private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClien
.withNewSpec()
.addNewContainer()
.withName("static-asset-server-container")
.withImage("spark-integration-test-asset-server:latest")
.withImage(s"spark-integration-test-asset-server:$dockerImageTag")
.withImagePullPolicy("IfNotPresent")
.withNewReadinessProbe()
.withHttpGet(probePingHttpGet)
Original file line number Diff line number Diff line change
@@ -18,9 +18,8 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.GCE

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}

import org.apache.spark.deploy.k8s.config.resolveK8sMaster
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND

private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _
@@ -36,5 +35,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
defaultClient
}

override def name(): String = GCE_TEST_BACKEND
override def dockerImageTag(): String = {
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
}
}
Original file line number Diff line number Diff line change
@@ -21,19 +21,19 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager

private[spark] trait IntegrationTestBackend {
def name(): String
def initialize(): Unit
def getKubernetesClient(): DefaultKubernetesClient
def dockerImageTag(): String
def cleanUp(): Unit = {}
}

private[spark] object IntegrationTestBackendFactory {
def getTestBackend(): IntegrationTestBackend = {
Option(System.getProperty("spark.kubernetes.test.master"))
.map(new GCETestBackend(_))
.getOrElse(new MinikubeTestBackend())
.getOrElse(MinikubeTestBackend)
}
}
Original file line number Diff line number Diff line change
@@ -26,65 +26,33 @@ import org.apache.spark.util.Utils

// TODO support windows
private[spark] object Minikube extends Logging {
private val MINIKUBE_EXECUTABLE_DEST = if (Utils.isMac) {
Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile
} else if (Utils.isWindows) {
throw new IllegalStateException("Executing Minikube based integration tests not yet " +
" available on Windows.")
} else {
Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile
}

private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " +
s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}"

private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60

def startMinikube(): Unit = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
executeMinikube("start", "--memory", "6000", "--cpus", "8")
} else {
logInfo("Minikube is already started.")
}
}

def getMinikubeIp: String = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
val outputs = executeMinikube("ip")
.filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
outputs.head
}

def getMinikubeStatus: MinikubeStatus.Value = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
val statusString = executeMinikube("status")
.filter(_.contains("minikube: "))
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}

def getDockerEnv: Map[String, String] = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
executeMinikube("docker-env", "--shell", "bash")
.filter(_.startsWith("export"))
.map(_.replaceFirst("export ", "").split('='))
.map(arr => (arr(0), arr(1).replaceAllLiterally("\"", "")))
.toMap
}

def deleteMinikube(): Unit = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.NONE) {
executeMinikube("delete")
} else {
logInfo("Minikube was already not running.")
}
}

def getKubernetesClient: DefaultKubernetesClient = synchronized {
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
val userHome = System.getProperty("user.home")
@@ -103,13 +71,8 @@ private[spark] object Minikube extends Logging {
}

private def executeMinikube(action: String, args: String*): Seq[String] = {
if (!MINIKUBE_EXECUTABLE_DEST.canExecute) {
if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) {
throw new IllegalStateException("Failed to make the Minikube binary executable.")
}
}
ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args,
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
ProcessUtils.executeProcess(
Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
}
}

Original file line number Diff line number Diff line change
@@ -16,30 +16,45 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube

import java.util.UUID

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.k8s.config.KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager

private[spark] object MinikubeTestBackend extends IntegrationTestBackend {

private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _
private val userProvidedDockerImageTag = Option(
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
private val resolvedDockerImageTag =
userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
private val dockerManager = new KubernetesSuiteDockerManager(
Minikube.getDockerEnv, resolvedDockerImageTag)

override def initialize(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
val minikubeStatus = Minikube.getMinikubeStatus
require(minikubeStatus == MinikubeStatus.RUNNING,
s"Minikube must be running before integration tests can execute. Current status" +
s" is: $minikubeStatus")
if (userProvidedDockerImageTag.isEmpty) {
dockerManager.buildSparkDockerImages()
}
defaultClient = Minikube.getKubernetesClient
}

override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def cleanUp(): Unit = {
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
super.cleanUp()
if (userProvidedDockerImageTag.isEmpty) {
dockerManager.deleteImages()
}
}

override def name(): String = MINIKUBE_TEST_BACKEND
override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def dockerImageTag(): String = resolvedDockerImageTag
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest.docker

import java.io.{File, PrintWriter}
import java.net.URI
import java.nio.file.Paths

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler}
import com.spotify.docker.client.DockerClient.{ListContainersParam, ListImagesParam, RemoveContainerParam}
import com.spotify.docker.client.messages.Container
import org.apache.http.client.utils.URIBuilder
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite
import org.apache.spark.internal.Logging
import org.apache.spark.util.{RedirectThread, Utils}

private[spark] class KubernetesSuiteDockerManager(
dockerEnv: Map[String, String], dockerTag: String) extends Logging {

private val DOCKER_BUILD_PATH = Paths.get("target", "docker")
// Dockerfile paths must be relative to the build path.
private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile"
private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile"
private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile"
private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile"
private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile"
private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile"
private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile"
private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile"
private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile"
private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile"
private val STATIC_ASSET_SERVER_DOCKER_FILE =
"dockerfiles/integration-test-asset-server/Dockerfile"
private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST",
throw new IllegalStateException("DOCKER_HOST env not found."))

private val originalDockerUri = URI.create(dockerHost)
private val httpsDockerUri = new URIBuilder()
.setHost(originalDockerUri.getHost)
.setPort(originalDockerUri.getPort)
.setScheme("https")
.build()

private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH",
throw new IllegalStateException("DOCKER_CERT_PATH env not found."))

private val dockerClient = new DefaultDockerClient.Builder()
.uri(httpsDockerUri)
.dockerCertificates(DockerCertificates
.builder()
.dockerCertPath(Paths.get(dockerCerts))
.build().get())
.build()

def buildSparkDockerImages(): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() }
// Building Python distribution environment
val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON")
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("/usr/bin/python")
val builder = new ProcessBuilder(
Seq(pythonExec, "setup.py", "sdist").asJava)
builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python"))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
val exitCode = process.waitFor()
if (exitCode != 0) {
logInfo(s"exitCode: $exitCode")
}
buildImage("spark-base", BASE_DOCKER_FILE)
buildImage("spark-driver", DRIVER_DOCKER_FILE)
buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE)
buildImage("spark-driver-r", DRIVERR_DOCKER_FILE)
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE)
buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE)
buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE)
buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE)
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE)
}

def deleteImages(): Unit = {
removeRunningContainers()
deleteImage("spark-driver")
deleteImage("spark-driver-py")
deleteImage("spark-driver-r")
deleteImage("spark-executor")
deleteImage("spark-executor-py")
deleteImage("spark-executor-r")
deleteImage("spark-shuffle")
deleteImage("spark-resource-staging-server")
deleteImage("spark-init")
deleteImage("spark-integration-test-asset-server")
deleteImage("spark-base")
}

private def buildImage(name: String, dockerFile: String): Unit = {
log.info(s"Building Docker image - $name:$dockerTag")
val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve(
s"$dockerFile-$dockerTag").toAbsolutePath.toString)
dockerFileWithBaseTag.deleteOnExit()
try {
val originalDockerFileText = Files.readLines(
DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala
val dockerFileTextWithProperBaseImage = originalDockerFileText.map(
_.replace("FROM spark-base", s"FROM spark-base:$dockerTag"))
Utils.tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter =>
Utils.tryWithResource(new PrintWriter(fileWriter)) { printWriter =>
for (line <- dockerFileTextWithProperBaseImage) {
// scalastyle:off println
printWriter.println(line)
// scalastyle:on println
}
}
}
dockerClient.build(
DOCKER_BUILD_PATH,
s"$name:$dockerTag",
s"$dockerFile-$dockerTag",
new LoggingBuildHandler())
} finally {
dockerFileWithBaseTag.delete()
}
}

/**
* Forces all containers running an image with the configured tag to halt and be removed.
*/
private def removeRunningContainers(): Unit = {
val imageIds = dockerClient.listImages(ListImagesParam.allImages())
.asScala
.filter(image => image.repoTags().asScala.exists(_.endsWith(s":$dockerTag")))
.map(_.id())
.toSet
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
val runningContainersWithImageTag = stopRunningContainers(imageIds)
require(
runningContainersWithImageTag.isEmpty,
s"${runningContainersWithImageTag.size} containers found still running" +
s" with the image tag $dockerTag")
}
dockerClient.listContainers(ListContainersParam.allContainers())
.asScala
.filter(container => imageIds.contains(container.imageId()))
.foreach(container => dockerClient.removeContainer(
container.id(), RemoveContainerParam.forceKill(true)))
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
val containersWithImageTag = dockerClient.listContainers(ListContainersParam.allContainers())
.asScala
.filter(container => imageIds.contains(container.imageId()))
require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" +
s" found with image tag $dockerTag.")
}

}

private def stopRunningContainers(imageIds: Set[String]): Iterable[Container] = {
val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds)
if (runningContainersWithImageTag.nonEmpty) {
log.info(s"Found ${runningContainersWithImageTag.size} containers running with" +
s" an image with the tag $dockerTag. Attempting to remove these containers," +
s" and then will stall for 2 seconds.")
runningContainersWithImageTag.foreach { container =>
dockerClient.stopContainer(container.id(), 5)
}
}
runningContainersWithImageTag
}

private def getRunningContainersWithImageIds(imageIds: Set[String]): Iterable[Container] = {
dockerClient
.listContainers(
ListContainersParam.allContainers(),
ListContainersParam.withStatusRunning())
.asScala
.filter(container => imageIds.contains(container.imageId()))
}

private def deleteImage(name: String): Unit = {
try {
dockerClient.removeImage(s"$name:$dockerTag")
} catch {
case e: RuntimeException =>
logWarning(s"Failed to delete image $name:$dockerTag. There may be images leaking in the" +
s" docker environment which are now stale and unused.", e)
}
}
}

This file was deleted.