diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 81aa31d79ba8..5166543933b3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -729,9 +729,9 @@ private[spark] object SparkConf extends Logging { EXECUTOR_MEMORY_OVERHEAD.key -> Seq( AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3")), KEYTAB.key -> Seq( - AlternateConfig("spark.yarn.keytab", "2.5")), + AlternateConfig("spark.yarn.keytab", "3.0")), PRINCIPAL.key -> Seq( - AlternateConfig("spark.yarn.principal", "2.5")) + AlternateConfig("spark.yarn.principal", "3.0")) ) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9544475ff042..80a4f8408746 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.api.java import java.{lang => jl} import java.lang.{Iterable => JIterable} -import java.util.{Comparator, List => JList} +import java.util.{Comparator, Iterator => JIterator, List => JList} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -34,7 +34,8 @@ import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} +import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction, + Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} import org.apache.spark.rdd.RDD.rddToPairRDDFunctions @@ -674,8 +675,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { - def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala + def flatMapValues[U](f: FlatMapFunction[V, U]): JavaPairRDD[K, U] = { + def fn: (V) => Iterator[U] = (x: V) => f.call(x).asScala implicit val ctag: ClassTag[U] = fakeClassTag fromRDD(rdd.flatMapValues(fn)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 61b379f28680..13fa6d011705 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -335,7 +335,7 @@ private[spark] class SparkSubmit extends Logging { val targetDir = Utils.createTempDir() // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) { + if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) { if (args.principal != null) { if (args.keytab != null) { require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") @@ -646,7 +646,8 @@ private[spark] class SparkSubmit extends Logging { } } - if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + if ((clusterManager == MESOS || clusterManager == KUBERNETES) + && UserGroupInformation.isSecurityEnabled) { setRMPrincipal(sparkConf) } @@ -762,8 +763,8 @@ private[spark] class SparkSubmit extends Logging { } // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with - // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we - // must trick it into thinking we're YARN. + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes + // mode, we must trick it into thinking we're YARN. private def setRMPrincipal(sparkConf: SparkConf): Unit = { val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 080ba12c2f0d..49f00cb10179 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -34,35 +34,21 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin @tailrec private def parse(args: List[String]): Unit = { - if (args.length == 1) { - setLogDirectory(args.head) - } else { - args match { - case ("--dir" | "-d") :: value :: tail => - setLogDirectory(value) - parse(tail) + args match { + case ("--help" | "-h") :: tail => + printUsageAndExit(0) - case ("--help" | "-h") :: tail => - printUsageAndExit(0) + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) - case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) + case Nil => - case Nil => - - case _ => - printUsageAndExit(1) - } + case _ => + printUsageAndExit(1) } } - private def setLogDirectory(value: String): Unit = { - logWarning("Setting log directory through the command line is deprecated as of " + - "Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead.") - conf.set("spark.history.fs.logDirectory", value) - } - // This mutates the SparkConf, so all accesses to it must be made after this line Utils.loadDefaultSparkProperties(conf, propertiesFile) @@ -73,8 +59,6 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin |Usage: HistoryServer [options] | |Options: - | DIR Deprecated; set spark.history.fs.logDirectory directly - | --dir DIR (-d DIR) Deprecated; set spark.history.fs.logDirectory directly | --properties-file FILE Path to a custom Spark properties file. | Default is conf/spark-defaults.conf. | diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 226c23733c87..4c6b0c1227b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -118,6 +118,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => + case _: IOException if maybeTruncated => + logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => throw ioe case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 65fa38387b9e..2fc0259c39d0 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils * * @param pageSize the number of rows in a page */ -private[ui] abstract class PagedDataSource[T](val pageSize: Int) { +private[spark] abstract class PagedDataSource[T](val pageSize: Int) { if (pageSize <= 0) { throw new IllegalArgumentException("Page size must be positive") @@ -72,7 +72,7 @@ private[ui] case class PageData[T](totalPage: Int, data: Seq[T]) /** * A paged table that will generate a HTML table for a specified page and also the page navigation. */ -private[ui] trait PagedTable[T] { +private[spark] trait PagedTable[T] { def tableId: String diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index 7a36b5f02dc4..bb389cdb39df 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -200,11 +200,12 @@ private[spark] object Benchmark { def getProcessorName(): String = { val cpu = if (SystemUtils.IS_OS_MAC_OSX) { Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) + .stripLineEnd } else if (SystemUtils.IS_OS_LINUX) { Try { val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo")) - .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") + .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") }.getOrElse("Unknown processor") } else { System.getenv("PROCESSOR_IDENTIFIER") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala index de321db845a6..37954826af90 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala @@ -40,18 +40,6 @@ class HistoryServerArgumentsSuite extends SparkFunSuite { assert(conf.get("spark.testing") === "true") } - test("Directory Arguments Parsing --dir or -d") { - val argStrings = Array("--dir", "src/test/resources/spark-events1") - val hsa = new HistoryServerArguments(conf, argStrings) - assert(conf.get("spark.history.fs.logDirectory") === "src/test/resources/spark-events1") - } - - test("Directory Param can also be set directly") { - val argStrings = Array("src/test/resources/spark-events2") - val hsa = new HistoryServerArguments(conf, argStrings) - assert(conf.get("spark.history.fs.logDirectory") === "src/test/resources/spark-events2") - } - test("Properties File Arguments Parsing --properties-file") { val tmpDir = Utils.createTempDir() val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir) diff --git a/dev/run-tests-jenkins.py b/dev/run-tests-jenkins.py index eca88f2391bf..77d751f49b9f 100755 --- a/dev/run-tests-jenkins.py +++ b/dev/run-tests-jenkins.py @@ -39,7 +39,8 @@ def print_err(msg): def post_message_to_github(msg, ghprb_pull_id): print("Attempting to post to Github...") - url = "https://api.github.com/repos/apache/spark/issues/" + ghprb_pull_id + "/comments" + api_url = os.getenv("GITHUB_API_BASE", "https://api.github.com/repos/apache/spark") + url = api_url + "/issues/" + ghprb_pull_id + "/comments" github_oauth_key = os.environ["GITHUB_OAUTH_KEY"] posted_message = json.dumps({"body": msg}) @@ -176,7 +177,8 @@ def main(): build_display_name = os.environ["BUILD_DISPLAY_NAME"] build_url = os.environ["BUILD_URL"] - commit_url = "https://github.com/apache/spark/commit/" + ghprb_actual_commit + project_url = os.getenv("SPARK_PROJECT_URL", "https://github.com/apache/spark") + commit_url = project_url + "/commit/" + ghprb_actual_commit # GitHub doesn't auto-link short hashes when submitted via the API, unfortunately. :( short_commit_hash = ghprb_actual_commit[0:7] diff --git a/docs/building-spark.md b/docs/building-spark.md index 55830d38a9e2..b2775d2ec65f 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -260,3 +260,31 @@ For SBT, specify a complete scala version using (e.g. 2.12.6): ./build/sbt -Dscala.version=2.12.6 Otherwise, the sbt-pom-reader plugin will use the `scala.version` specified in the spark-parent pom. + +## Running Jenkins tests with Github Enterprise + +To run tests with Jenkins: + + ./dev/run-tests-jenkins + +If use an individual repository or a repository on GitHub Enterprise, export below environment variables before running above command. + +### Related environment variables + +
| Variable Name | Default | Meaning |
|---|---|---|
SPARK_PROJECT_URL |
+ https://github.com/apache/spark | ++ The Spark project URL of GitHub Enterprise. + | +
GITHUB_API_BASE |
+ https://api.github.com/repos/apache/spark | ++ The Spark project API server URL of GitHub Enterprise. + | +
spark.kubernetes.kerberos.krb5.path(none)spark.kubernetes.kerberos.krb5.configMapName(none)spark.kubernetes.hadoop.configMapName(none)spark.kubernetes.kerberos.tokenSecret.name(none)spark.kubernetes.kerberos.tokenSecret.itemKey(none)k, m, g, t, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.
+spark.yarn.am.resource.{resource-type}(none)spark.yarn.driver.resource.<resource-type> instead.
+ Please note that this feature can be used only with YARN 3.0+
+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
+
+ Example:
+ To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu
+ spark.yarn.driver.resource.{resource-type}(none)spark.yarn.driver.resource.yarn.io/gpu
+ spark.yarn.executor.resource.{resource-type}(none)spark.yarn.executor.resource.yarn.io/gpu
+ spark.yarn.am.cores1
+ * - The secret containing a DT, either previously specified or built on the fly
+ * - The name of the secret where the DT will be stored
+ * - The data item-key on the secret which correlates with where the current DT data is stored
+ * - The Job User's username
+ */
+private[spark] case class KerberosConfigSpec(
+ dtSecret: Option[Secret],
+ dtSecretName: String,
+ dtSecretItemKey: String,
+ jobUserName: String)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
new file mode 100644
index 000000000000..135e2c482bbb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+
+/**
+ * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
+ * on the behalf of the Kubernetes submission client. The new credentials
+ * (called Tokens when they are serialized) are stored in Secrets accessible
+ * to the driver and executors, when new Tokens are received they overwrite the current Secrets.
+ */
+private[spark] class KubernetesHadoopDelegationTokenManager(
+ tokenManager: HadoopDelegationTokenManager) extends Logging {
+
+ // HadoopUGI Util methods
+ def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
+ def getShortUserName: String = getCurrentUser.getShortUserName
+ def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
+ def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
+ def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+ def serializeCreds(creds: Credentials): Array[Byte] = SparkHadoopUtil.get.serialize(creds)
+ def nextRT(rt: Long, conf: SparkConf): Long = SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)
+
+ def getDelegationTokens(
+ creds: Credentials,
+ conf: SparkConf,
+ hadoopConf: Configuration): (Array[Byte], Long) = {
+ try {
+ val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+ logDebug(s"Initialized tokens")
+ (serializeCreds(creds), nextRT(rt, conf))
+ } catch {
+ case e: Exception =>
+ logError(s"Failed to fetch Hadoop delegation tokens $e")
+ throw e
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index af3903ac5da5..c658756cc165 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.hadoop.security.UserGroupInformation
import scala.collection.mutable
import scala.util.control.NonFatal
@@ -45,7 +46,8 @@ private[spark] case class ClientArguments(
mainAppResource: Option[MainAppResource],
mainClass: String,
driverArgs: Array[String],
- maybePyFiles: Option[String])
+ maybePyFiles: Option[String],
+ hadoopConfigDir: Option[String])
private[spark] object ClientArguments {
@@ -79,7 +81,8 @@ private[spark] object ClientArguments {
mainAppResource,
mainClass.get,
driverArgs.toArray,
- maybePyFiles)
+ maybePyFiles,
+ sys.env.get(ENV_HADOOP_CONF_DIR))
}
}
@@ -222,7 +225,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs,
- clientArguments.maybePyFiles)
+ clientArguments.maybePyFiles,
+ clientArguments.hadoopConfigDir)
val builder = new KubernetesDriverBuilder
val namespace = kubernetesConf.namespace()
// The master URL has been checked for validity already in SparkSubmit.
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 8f3f18ffadc3..b0b53321abd2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep}
+import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
private[spark] class KubernetesDriverBuilder(
@@ -51,7 +51,11 @@ private[spark] class KubernetesDriverBuilder(
provideJavaStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> JavaDriverFeatureStep) =
- new JavaDriverFeatureStep(_)) {
+ new JavaDriverFeatureStep(_),
+ provideHadoopGlobalStep: (
+ KubernetesConf[KubernetesDriverSpecificConf]
+ => KerberosConfDriverFeatureStep) =
+ new KerberosConfDriverFeatureStep(_)) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
@@ -80,8 +84,14 @@ private[spark] class KubernetesDriverBuilder(
provideRStep(kubernetesConf)}
.getOrElse(provideJavaStep(kubernetesConf))
- val allFeatures = (baseFeatures :+ bindingsStep) ++
- secretFeature ++ envSecretFeature ++ volumesFeature
+ val maybeHadoopConfigStep =
+ kubernetesConf.hadoopConfSpec.map { _ =>
+ provideHadoopGlobalStep(kubernetesConf)}
+
+ val allFeatures: Seq[KubernetesFeatureConfigStep] =
+ (baseFeatures :+ bindingsStep) ++
+ secretFeature ++ envSecretFeature ++ volumesFeature ++
+ maybeHadoopConfigStep.toSeq
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 364b6fb36772..6199a8ae3043 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,8 +17,8 @@
package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
@@ -35,10 +35,26 @@ private[spark] class KubernetesExecutorBuilder(
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountVolumesFeatureStep) =
- new MountVolumesFeatureStep(_)) {
+ new MountVolumesFeatureStep(_),
+ provideHadoopConfStep: (
+ KubernetesConf[KubernetesExecutorSpecificConf]
+ => HadoopConfExecutorFeatureStep) =
+ new HadoopConfExecutorFeatureStep(_),
+ provideKerberosConfStep: (
+ KubernetesConf[KubernetesExecutorSpecificConf]
+ => KerberosConfExecutorFeatureStep) =
+ new KerberosConfExecutorFeatureStep(_),
+ provideHadoopSparkUserStep: (
+ KubernetesConf[KubernetesExecutorSpecificConf]
+ => HadoopSparkUserExecutorFeatureStep) =
+ new HadoopSparkUserExecutorFeatureStep(_)) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
+ val sparkConf = kubernetesConf.sparkConf
+ val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
+ val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
+ val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
@@ -51,7 +67,23 @@ private[spark] class KubernetesExecutorBuilder(
Seq(provideVolumesStep(kubernetesConf))
} else Nil
- val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
+ val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ =>
+ val maybeKerberosStep =
+ if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) {
+ provideKerberosConfStep(kubernetesConf)
+ } else {
+ provideHadoopSparkUserStep(kubernetesConf)
+ }
+ Seq(provideHadoopConfStep(kubernetesConf)) :+
+ maybeKerberosStep
+ }.getOrElse(Seq.empty[KubernetesFeatureConfigStep])
+
+ val allFeatures: Seq[KubernetesFeatureConfigStep] =
+ baseFeatures ++
+ secretFeature ++
+ secretEnvFeature ++
+ volumesFeature ++
+ maybeHadoopConfFeatureSteps
var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index e3c19cdb8156..bb2b94f9976e 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -59,7 +59,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
- maybePyFiles = None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(conf.appId === APP_ID)
assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
@@ -81,7 +82,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppJar,
MAIN_CLASS,
APP_ARGS,
- maybePyFiles = None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
.split(",")
=== Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
@@ -93,7 +95,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
- maybePyFiles = None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1)
@@ -114,7 +117,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource,
MAIN_CLASS,
APP_ARGS,
- Some(inputPyFiles.mkString(",")))
+ Some(inputPyFiles.mkString(",")),
+ hadoopConfDir = None)
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
@@ -136,7 +140,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource,
MAIN_CLASS,
APP_ARGS,
- maybePyFiles = None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
@@ -158,7 +163,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource,
MAIN_CLASS,
APP_ARGS,
- None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
}
@@ -189,7 +195,8 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
- maybePyFiles = None)
+ maybePyFiles = None,
+ hadoopConfDir = None)
assert(conf.roleLabels === Map(
SPARK_APP_ID_LABEL -> APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 0968cce971c3..eebdd157da63 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -77,7 +77,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
Map.empty,
DRIVER_ENVS,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
@@ -139,7 +140,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.set(CONTAINER_IMAGE, "spark-driver:latest")
val pythonSparkConf = new SparkConf()
.set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
- .set(CONTAINER_IMAGE, "spark-driver:latest")
+ .set(CONTAINER_IMAGE, "spark-driver-py:latest")
val javaKubernetesConf = KubernetesConf(
javaSparkConf,
KubernetesDriverSpecificConf(
@@ -155,7 +156,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
Map.empty,
DRIVER_ENVS,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
+
val pythonKubernetesConf = KubernetesConf(
pythonSparkConf,
KubernetesDriverSpecificConf(
@@ -171,12 +174,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
Map.empty,
DRIVER_ENVS,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
val basePod = SparkPod.initialPod()
val configuredJavaPod = javaFeatureStep.configurePod(basePod)
val configuredPythonPod = pythonFeatureStep.configurePod(basePod)
+ assert(configuredJavaPod.container.getImage === "spark-driver:latest")
+ assert(configuredPythonPod.container.getImage === "spark-driver-py:latest")
}
test("Additional system properties resolve jars and set cluster-mode confs.") {
@@ -198,7 +204,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
Map.empty,
DRIVER_ENVS,
Nil,
- allFiles)
+ allFiles,
+ hadoopConfSpec = None)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 63b237b9dfe4..41f34bd45cd5 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -91,7 +91,8 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())
// The executor pod name and default labels.
@@ -131,7 +132,8 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
@@ -152,7 +154,8 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Map("qux" -> "quux"),
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor,
@@ -179,7 +182,8 @@ class BasicExecutorFeatureStepSuite
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 7e916b385440..8675ceb48cf6 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -62,7 +62,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
@@ -94,7 +95,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
@@ -133,7 +135,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
val expectedSparkConf = Map(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 8b91e93eecd8..5c3e80150151 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -68,7 +68,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
@@ -100,7 +101,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
@@ -122,7 +124,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]))
+ Seq.empty[String],
+ hadoopConfSpec = None))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
.head
@@ -153,7 +156,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]),
+ Seq.empty[String],
+ hadoopConfSpec = None),
clock)
val driverService = configurationStep
.getAdditionalKubernetesResources()
@@ -181,7 +185,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]),
+ Seq.empty[String],
+ hadoopConfSpec = None),
clock)
fail("The driver bind address should not be allowed.")
} catch {
@@ -207,7 +212,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String]),
+ Seq.empty[String],
+ hadoopConfSpec = None),
clock)
fail("The driver host address should not be allowed.")
} catch {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 85c6cb282d2b..43796b77efdc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -46,7 +46,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
envVarsToKeys,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val step = new EnvSecretsFeatureStep(kubernetesConf)
val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index acdd07bc594b..3a4e60547d7f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -48,7 +48,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
}
test("Resolve to default local dir if neither env nor configuration are set") {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index dad610c443ac..18e3d773f690 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
val step = new MountSecretsFeatureStep(kubernetesConf)
val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index d309aa94ec11..0d0a5fb951f6 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -36,7 +36,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Nil,
- sparkFiles = Nil)
+ sparkFiles = Nil,
+ hadoopConfSpec = None)
test("Mounts hostPath volumes") {
val volumeConf = KubernetesVolumeSpec(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
index bf552aeb8b90..9172e0c3dc40 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -43,7 +43,8 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Nil,
- sparkFiles = Seq.empty[String])
+ sparkFiles = Seq.empty[String],
+ hadoopConfSpec = None)
val step = new JavaDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
index c14af1d3b0f0..2bcc6465b79d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
@@ -53,7 +53,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Nil,
- sparkFiles = Seq.empty[String])
+ sparkFiles = Seq.empty[String],
+ hadoopConfSpec = None)
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
@@ -90,7 +91,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Nil,
- sparkFiles = Seq.empty[String])
+ sparkFiles = Seq.empty[String],
+ hadoopConfSpec = None)
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
val args = driverContainerwithPySpark
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
index ace0faa8629c..17af6011a17d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
@@ -47,7 +47,8 @@ class RDriverFeatureStepSuite extends SparkFunSuite {
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Seq.empty,
- sparkFiles = Seq.empty[String])
+ sparkFiles = Seq.empty[String],
+ hadoopConfSpec = None)
val step = new RDriverFeatureStep(kubernetesConf)
val driverContainerwithR = step.configurePod(baseDriverPod).container
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 4d8e79189ff3..ae13df39b7a7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -142,7 +142,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 4117c5487a41..051d7b6994f5 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
class KubernetesDriverBuilderSuite extends SparkFunSuite {
@@ -30,9 +29,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val SECRETS_STEP_TYPE = "mount-secrets"
private val JAVA_STEP_TYPE = "java-bindings"
- private val PYSPARK_STEP_TYPE = "pyspark-bindings"
private val R_STEP_TYPE = "r-bindings"
+ private val PYSPARK_STEP_TYPE = "pyspark-bindings"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
+ private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -62,6 +62,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
+ private val hadoopGlobalStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep])
+
private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
@@ -76,7 +79,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
_ => mountVolumesStep,
_ => pythonStep,
_ => rStep,
- _ => javaStep)
+ _ => javaStep,
+ _ => hadoopGlobalStep)
test("Apply fundamental steps all the time.") {
val conf = KubernetesConf(
@@ -94,7 +98,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -120,7 +125,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map("EnvName" -> "SecretName:secretKey"),
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -148,7 +154,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -174,7 +181,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -205,7 +213,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
volumeSpec :: Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -232,7 +241,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -242,8 +252,71 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
R_STEP_TYPE)
}
+ test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") {
+ val conf = KubernetesConf(
+ new SparkConf(false),
+ KubernetesDriverSpecificConf(
+ None,
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ hadoopConfSpec = Some(
+ HadoopConfSpec(
+ Some("/var/hadoop-conf"),
+ None)))
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ JAVA_STEP_TYPE,
+ HADOOP_GLOBAL_STEP_TYPE)
+ }
+
+ test("Apply HadoopSteps if HADOOP_CONF ConfigMap is defined.") {
+ val conf = KubernetesConf(
+ new SparkConf(false),
+ KubernetesDriverSpecificConf(
+ None,
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ hadoopConfSpec = Some(
+ HadoopConfSpec(
+ None,
+ Some("pre-defined-configMapName"))))
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ JAVA_STEP_TYPE,
+ HADOOP_GLOBAL_STEP_TYPE)
+ }
+
+
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
- : Unit = {
+ : Unit = {
assert(resolvedSpec.systemProperties.size === stepTypes.size)
stepTypes.foreach { stepType =>
assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 0e617b002101..b336774838bc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -162,6 +162,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
} else {
val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
val executorSpecificConf = k8sConf.roleSpecificConf
+ // TODO: HADOOP_CONF_DIR
val expectedK8sConf = KubernetesConf.createExecutorConf(
conf,
executorSpecificConf.executorId,
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 44fe4a24e110..b572dac2bf62 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
@@ -27,6 +28,9 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val SECRETS_STEP_TYPE = "mount-secrets"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
+ private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step"
+ private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user"
+ private val KERBEROS_CONF_STEP_TYPE = "kerberos-step"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -37,6 +41,12 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
+ private val hadoopConfStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep])
+ private val hadoopSparkUser = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep])
+ private val kerberosConf = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep])
private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
@@ -45,7 +55,10 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
_ => mountSecretsStep,
_ => envSecretsStep,
_ => localDirsStep,
- _ => mountVolumesStep)
+ _ => mountVolumesStep,
+ _ => hadoopConfStep,
+ _ => kerberosConf,
+ _ => hadoopSparkUser)
test("Basic steps are consistently applied.") {
val conf = KubernetesConf(
@@ -60,7 +73,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
}
@@ -78,7 +92,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map("secret-name" -> "secret-key"),
Map.empty,
Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -105,7 +120,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
volumeSpec :: Nil,
- Seq.empty[String])
+ Seq.empty[String],
+ None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
@@ -113,6 +129,64 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
MOUNT_VOLUMES_STEP_TYPE)
}
+ test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") {
+ // HADOOP_DELEGATION_TOKEN
+ val HADOOP_CREDS_PREFIX = "spark.security.credentials."
+ val HADOOPFS_PROVIDER = s"$HADOOP_CREDS_PREFIX.hadoopfs.enabled"
+ val conf = KubernetesConf(
+ new SparkConf(false)
+ .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
+ .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")
+ .set(KERBEROS_SPARK_USER_NAME, "spark-user")
+ .set(HADOOPFS_PROVIDER, "true"),
+ KubernetesExecutorSpecificConf(
+ "executor-id", Some(new PodBuilder().build())),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Some(HadoopConfSpec(Some("/var/hadoop-conf"), None)))
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ HADOOP_CONF_STEP_TYPE,
+ HADOOP_SPARK_USER_STEP_TYPE)
+ }
+
+ test("Apply kerberos step if DT secrets created") {
+ val conf = KubernetesConf(
+ new SparkConf(false)
+ .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
+ .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")
+ .set(KERBEROS_SPARK_USER_NAME, "spark-user")
+ .set(KERBEROS_DT_SECRET_NAME, "dt-secret")
+ .set(KERBEROS_DT_SECRET_KEY, "dt-key"),
+ KubernetesExecutorSpecificConf(
+ "executor-id", Some(new PodBuilder().build())),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName"))))
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ HADOOP_CONF_STEP_TYPE,
+ KERBEROS_CONF_STEP_TYPE)
+ }
+
private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = {
assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size)
stepTypes.foreach { stepType =>
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 1c4dcd547687..4bada0d75321 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -30,7 +30,7 @@ ARG k8s_tests=kubernetes/tests
RUN set -ex && \
apk upgrade --no-cache && \
- apk add --no-cache bash tini libc6-compat linux-pam && \
+ apk add --no-cache bash tini libc6-compat linux-pam krb5 krb5-libs && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/work-dir && \
touch /opt/spark/RELEASE && \
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 216e8fe31bec..4958b7363fee 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -83,6 +83,10 @@ elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
export PYSPARK_DRIVER_PYTHON="python3"
fi
+if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
+ SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
+fi
+
case "$SPARK_K8S_CMD" in
driver)
CMD=(
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 01bdebc000b9..67d2c8610e91 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -154,6 +154,8 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
+ ResourceRequestHelper.validateResources(sparkConf)
+
var appId: ApplicationId = null
try {
launcherBackend.connect()
@@ -234,6 +236,13 @@ private[spark] class Client(
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
+ val amResources =
+ if (isClusterMode) {
+ sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
+ } else {
+ sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
+ }
+ logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
@@ -256,6 +265,10 @@ private[spark] class Client(
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
+ if (amResources.nonEmpty) {
+ ResourceRequestHelper.setResourceRequests(amResources, capability)
+ }
+ logDebug(s"Created resource capability for AM request: $capability")
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
new file mode 100644
index 000000000000..9534f3aaa243
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+ private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+ private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+ /**
+ * Validates sparkConf and throws a SparkException if any of standard resources (memory or cores)
+ * is defined with the property spark.yarn.x.resource.y
+ * Need to reject all combinations of AM / Driver / Executor and memory / CPU cores resources, as
+ * Spark has its own names for them (memory, cores),
+ * but YARN have its names too: (memory, memory-mb, mb) and (cores, vcores, cpu-vcores).
+ * We need to disable every possible way YARN could receive the resource definitions above.
+ */
+ def validateResources(sparkConf: SparkConf): Unit = {
+ val resourceDefinitions = Seq[(String, String)](
+ (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+ (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+ (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory"),
+ (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "mb"),
+ (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "mb"),
+ (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb"),
+ (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory-mb"),
+ (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory-mb"),
+ (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb"),
+ (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+ (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+ (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"),
+ (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "vcores"),
+ (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "vcores"),
+ (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores"),
+ (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
+ (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
+ (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"))
+ val errorMessage = new mutable.StringBuilder()
+
+ resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+ if (sparkConf.contains(resourceRequest)) {
+ errorMessage.append(s"Error: Do not use $resourceRequest, " +
+ s"please use $sparkName instead!\n")
+ }
+ }
+
+ if (errorMessage.nonEmpty) {
+ throw new SparkException(errorMessage.toString())
+ }
+ }
+
+ /**
+ * Sets resource amount with the corresponding unit to the passed resource object.
+ * @param resources resource values to set
+ * @param resource resource object to update
+ */
+ def setResourceRequests(
+ resources: Map[String, String],
+ resource: Resource): Unit = {
+ require(resource != null, "Resource parameter should not be null!")
+
+ logDebug(s"Custom resources requested: $resources")
+ if (!isYarnResourceTypesAvailable()) {
+ if (resources.nonEmpty) {
+ logWarning("Ignoring custom resource requests because " +
+ "the version of YARN does not support it!")
+ }
+ return
+ }
+
+ val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS)
+ val setResourceInformationMethod =
+ resource.getClass.getMethod("setResourceInformation", classOf[String], resInfoClass)
+ resources.foreach { case (name, rawAmount) =>
+ try {
+ val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+ val amount = amountPart.toLong
+ val unit = unitPart match {
+ case "g" => "G"
+ case "t" => "T"
+ case "p" => "P"
+ case _ => unitPart
+ }
+ logDebug(s"Registering resource with name: $name, amount: $amount, unit: $unit")
+ val resourceInformation = createResourceInformation(name, amount, unit, resInfoClass)
+ setResourceInformationMethod.invoke(
+ resource, name, resourceInformation.asInstanceOf[AnyRef])
+ } catch {
+ case _: MatchError =>
+ throw new IllegalArgumentException(s"Resource request for '$name' ('$rawAmount') " +
+ s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
+ case e: InvocationTargetException if e.getCause != null => throw e.getCause
+ }
+ }
+ }
+
+ private def createResourceInformation(
+ resourceName: String,
+ amount: Long,
+ unit: String,
+ resInfoClass: Class[_]): Any = {
+ val resourceInformation =
+ if (unit.nonEmpty) {
+ val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
+ classOf[String], classOf[String], JLong.TYPE)
+ resInfoNewInstanceMethod.invoke(null, resourceName, unit, amount.asInstanceOf[JLong])
+ } else {
+ val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
+ classOf[String], JLong.TYPE)
+ resInfoNewInstanceMethod.invoke(null, resourceName, amount.asInstanceOf[JLong])
+ }
+ resourceInformation
+ }
+
+ /**
+ * Checks whether Hadoop 2.x or 3 is used as a dependency.
+ * In case of Hadoop 3 and later, the ResourceInformation class
+ * should be available on the classpath.
+ */
+ def isYarnResourceTypesAvailable(): Boolean = {
+ Try(Utils.classForName(RESOURCE_INFO_CLASS)).isSuccess
+ }
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a7551de7c08..ebdcf45603ce 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -140,10 +140,18 @@ private[yarn] class YarnAllocator(
}
// Number of cores per executor.
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
- // Resource capability requested for each executors
- private[yarn] val resource = Resource.newInstance(
- executorMemory + memoryOverhead + pysparkWorkerMemory,
- executorCores)
+
+ private val executorResourceRequests =
+ sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap
+
+ // Resource capability requested for each executor
+ private[yarn] val resource: Resource = {
+ val resource = Resource.newInstance(
+ executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
+ ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
+ logDebug(s"Created resource capability: $resource")
+ resource
+ }
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
@@ -288,9 +296,16 @@ private[yarn] class YarnAllocator(
s"executorsStarting: ${numExecutorsStarting.get}")
if (missing > 0) {
- logInfo(s"Will request $missing executor container(s), each with " +
- s"${resource.getVirtualCores} core(s) and " +
- s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
+ if (log.isInfoEnabled()) {
+ var requestContainerMessage = s"Will request $missing executor container(s), each with " +
+ s"${resource.getVirtualCores} core(s) and " +
+ s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
+ if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
+ executorResourceRequests.nonEmpty) {
+ requestContainerMessage ++= s" with custom resources: " + resource.toString
+ }
+ logInfo(requestContainerMessage)
+ }
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
@@ -456,13 +471,20 @@ private[yarn] class YarnAllocator(
// memory, but use the asked vcore count for matching, effectively disabling matching on vcore
// count.
val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
- resource.getVirtualCores)
+ resource.getVirtualCores)
+
+ ResourceRequestHelper.setResourceRequests(executorResourceRequests, matchingResource)
+
+ logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
+ s"priority: ${allocatedContainer.getPriority}, " +
+ s"location: $location, resource: $matchingResource")
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
matchingResource)
// Match the allocation to a request
if (!matchingRequests.isEmpty) {
val containerRequest = matchingRequests.get(0).iterator.next
+ logDebug(s"Removing container request via AM client: $containerRequest")
amClient.removeContainerRequest(containerRequest)
containersToUse += allocatedContainer
} else {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ab8273bd6321..f2ed555edc1d 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -345,4 +345,8 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
+ private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
+ private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."
+
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 26013a109c42..533cb2b0f0bd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -23,6 +23,7 @@ import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -199,6 +200,20 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getMaxAppAttempts should be (42)
}
+ test("resource request (client mode)") {
+ val sparkConf = new SparkConf().set("spark.submit.deployMode", "client")
+ .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "2")
+ .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "3")
+ testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 2), ("gpu", 3)))
+ }
+
+ test("resource request (cluster mode)") {
+ val sparkConf = new SparkConf().set("spark.submit.deployMode", "cluster")
+ .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "4")
+ .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "gpu", "5")
+ testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 4), ("gpu", 5)))
+ }
+
test("spark.yarn.jars with multiple paths and globs") {
val libs = Utils.createTempDir()
val single = Utils.createTempDir()
@@ -433,4 +448,30 @@ class ClientSuite extends SparkFunSuite with Matchers {
classpath(env)
}
+ private def testResourceRequest(
+ sparkConf: SparkConf,
+ resources: List[String],
+ expectedResources: Seq[(String, Long)]): Unit = {
+ assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ ResourceRequestTestHelper.initializeResourceTypes(resources)
+
+ val args = new ClientArguments(Array())
+
+ val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, sparkConf)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getAMContainerSpec should be (containerLaunchContext)
+ appContext.getApplicationType should be ("SPARK")
+
+ expectedResources.foreach { case (name, value) =>
+ ResourceRequestTestHelper.getResourceTypeValue(appContext.getResource, name) should be (value)
+ }
+ }
+
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
new file mode 100644
index 000000000000..60059987ba3f
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+ private val CUSTOM_RES_1 = "custom-resource-type-1"
+ private val CUSTOM_RES_2 = "custom-resource-type-2"
+ private val MEMORY = "memory"
+ private val CORES = "cores"
+ private val NEW_CONFIG_EXECUTOR_MEMORY = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+ private val NEW_CONFIG_EXECUTOR_CORES = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+ private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+ private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+ private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY
+ private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES
+
+ test("resource request value does not match pattern") {
+ verifySetResourceRequestsException(List(CUSTOM_RES_1),
+ Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+ }
+
+ test("resource request just unit defined") {
+ verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), CUSTOM_RES_1)
+ }
+
+ test("resource request with null value should not be allowed") {
+ verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> "123"),
+ "requirement failed: Resource parameter should not be null!")
+ }
+
+ test("resource request with valid value and invalid unit") {
+ verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
+ Map(CUSTOM_RES_1 -> "123ppp"), "")
+ }
+
+ test("resource request with valid value and without unit") {
+ verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "123"),
+ Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+ }
+
+ test("resource request with valid value and unit") {
+ verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "2g"),
+ Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+ }
+
+ test("two resource requests with valid values and units") {
+ verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+ Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+ Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+ CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+ }
+
+ test("empty SparkConf should be valid") {
+ val sparkConf = new SparkConf()
+ ResourceRequestHelper.validateResources(sparkConf)
+ }
+
+ test("just normal resources are defined") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(DRIVER_MEMORY.key, "3G")
+ sparkConf.set(DRIVER_CORES.key, "4")
+ sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+ sparkConf.set(EXECUTOR_CORES.key, "2")
+ ResourceRequestHelper.validateResources(sparkConf)
+ }
+
+ test("memory defined with new config for executor") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_EXECUTOR_MEMORY, "30G")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_MEMORY)
+ }
+
+ test("memory defined with new config for executor 2") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb", "30G")
+ verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb")
+ }
+
+ test("memory defined with new config for executor 3") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb", "30G")
+ verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb")
+ }
+
+ test("cores defined with new config for executor") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_EXECUTOR_CORES, "5")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_CORES)
+ }
+
+ test("cores defined with new config for executor 2") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores", "5")
+ verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores")
+ }
+
+ test("memory defined with new config, client mode") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_AM_MEMORY, "1G")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_MEMORY)
+ }
+
+ test("memory defined with new config for driver, cluster mode") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "1G")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_MEMORY)
+ }
+
+ test("cores defined with new config, client mode") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_AM_CORES, "3")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_CORES)
+ }
+
+ test("cores defined with new config for driver, cluster mode") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(NEW_CONFIG_DRIVER_CORES, "1G")
+ verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_CORES)
+ }
+
+ test("various duplicated definitions") {
+ val sparkConf = new SparkConf()
+ sparkConf.set(DRIVER_MEMORY.key, "2G")
+ sparkConf.set(DRIVER_CORES.key, "2")
+ sparkConf.set(EXECUTOR_MEMORY.key, "2G")
+ sparkConf.set(EXECUTOR_CORES.key, "4")
+ sparkConf.set(AM_MEMORY.key, "3G")
+ sparkConf.set(NEW_CONFIG_EXECUTOR_MEMORY, "3G")
+ sparkConf.set(NEW_CONFIG_AM_MEMORY, "2G")
+ sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "2G")
+
+ val thrown = intercept[SparkException] {
+ ResourceRequestHelper.validateResources(sparkConf)
+ }
+ thrown.getMessage should (
+ include(NEW_CONFIG_EXECUTOR_MEMORY) and
+ include(NEW_CONFIG_AM_MEMORY) and
+ include(NEW_CONFIG_DRIVER_MEMORY))
+ }
+
+ private def verifySetResourceRequestsSuccessful(
+ definedResourceTypes: List[String],
+ resourceRequests: Map[String, String],
+ expectedResources: Map[String, ResourceInformation]): Unit = {
+ assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
+
+ val resource = createResource()
+ ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
+
+ expectedResources.foreach { case (name, ri) =>
+ val resourceInfo = ResourceRequestTestHelper.getResourceInformationByName(resource, name)
+ assert(resourceInfo === ri)
+ }
+ }
+
+ private def verifySetResourceRequestsException(
+ definedResourceTypes: List[String],
+ resourceRequests: Map[String, String],
+ message: String): Unit = {
+ val resource = createResource()
+ verifySetResourceRequestsException(definedResourceTypes, resource, resourceRequests, message)
+ }
+
+ private def verifySetResourceRequestsException(
+ definedResourceTypes: List[String],
+ resource: Resource,
+ resourceRequests: Map[String, String],
+ message: String) = {
+ assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
+ val thrown = intercept[IllegalArgumentException] {
+ ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
+ }
+ if (!message.isEmpty) {
+ thrown.getMessage should include (message)
+ }
+ }
+
+ private def verifyValidateResourcesException(sparkConf: SparkConf, message: String) = {
+ val thrown = intercept[SparkException] {
+ ResourceRequestHelper.validateResources(sparkConf)
+ }
+ thrown.getMessage should include (message)
+ }
+
+ private def createResource(): Resource = {
+ val resource = Records.newRecord(classOf[Resource])
+ resource.setMemory(512)
+ resource.setVirtualCores(2)
+ resource
+ }
+}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
new file mode 100644
index 000000000000..c46f3c5faff9
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.util.Utils
+
+object ResourceRequestTestHelper {
+ def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+ if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+ throw new IllegalStateException("This method should not be invoked " +
+ "since YARN resource types is not available because of old Hadoop version!" )
+ }
+
+ val allResourceTypes = new ListBuffer[AnyRef]
+ // ResourceUtils.reinitializeResources() is the YARN-way
+ // to specify resources for the execution of the tests.
+ // This method should receive standard resources with names of memory-mb and vcores.
+ // Without specifying the standard resources or specifying them
+ // with different names e.g. memory, YARN would throw various exceptions
+ // because it relies on that standard resources are always specified.
+ val defaultResourceTypes = List(
+ createResourceTypeInfo("memory-mb"),
+ createResourceTypeInfo("vcores"))
+ val customResourceTypes = resourceTypes.map(createResourceTypeInfo)
+ allResourceTypes ++= defaultResourceTypes
+ allResourceTypes ++= customResourceTypes
+
+ val resourceUtilsClass =
+ Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+ val reinitializeResourcesMethod = resourceUtilsClass.getMethod("reinitializeResources",
+ classOf[java.util.List[AnyRef]])
+ reinitializeResourcesMethod.invoke(null, allResourceTypes.asJava)
+ }
+
+ private def createResourceTypeInfo(resourceName: String): AnyRef = {
+ val resTypeInfoClass = Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+ val resTypeInfoNewInstanceMethod = resTypeInfoClass.getMethod("newInstance", classOf[String])
+ resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+ }
+
+ def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+ val resourceInformation = getResourceInformation(res, name)
+ invokeMethod(resourceInformation, "getValue")
+ }
+
+ def getResourceInformationByName(res: Resource, nameParam: String): ResourceInformation = {
+ val resourceInformation: AnyRef = getResourceInformation(res, nameParam)
+ val name = invokeMethod(resourceInformation, "getName").asInstanceOf[String]
+ val value = invokeMethod(resourceInformation, "getValue").asInstanceOf[Long]
+ val units = invokeMethod(resourceInformation, "getUnits").asInstanceOf[String]
+ ResourceInformation(name, value, units)
+ }
+
+ private def getResourceInformation(res: Resource, name: String): AnyRef = {
+ if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+ throw new IllegalStateException("assertResourceTypeValue() should not be invoked " +
+ "since yarn resource types is not available because of old Hadoop version!")
+ }
+
+ val getResourceInformationMethod = res.getClass.getMethod("getResourceInformation",
+ classOf[String])
+ val resourceInformation = getResourceInformationMethod.invoke(res, name)
+ resourceInformation
+ }
+
+ private def invokeMethod(resourceInformation: AnyRef, methodName: String): AnyRef = {
+ val getValueMethod = resourceInformation.getClass.getMethod(methodName)
+ getValueMethod.invoke(resourceInformation)
+ }
+
+ case class ResourceInformation(name: String, value: Long, units: String)
+}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 3f783baed110..35299166d981 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.ArgumentCaptor
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
@@ -86,7 +87,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(
maxExecutors: Int = 5,
- rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
+ rmClient: AMRMClient[ContainerRequest] = rmClient,
+ additionalConfigs: Map[String, String] = Map()): YarnAllocator = {
val args = Array(
"--jar", "somejar.jar",
"--class", "SomeClass")
@@ -95,6 +97,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
.set("spark.executor.instances", maxExecutors.toString)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "2048")
+
+ for ((name, value) <- additionalConfigs) {
+ sparkConfClone.set(name, value)
+ }
+
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -108,12 +115,12 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock)
}
- def createContainer(host: String): Container = {
+ def createContainer(host: String, resource: Resource = containerResource): Container = {
// When YARN 2.6+ is required, avoid deprecation by using version with long second arg
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
- Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
+ Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
}
test("single container allocated") {
@@ -134,6 +141,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
size should be (0)
}
+ test("custom resource requested from yarn") {
+ assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
+
+ val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+ val handler = createAllocator(1, mockAmClient,
+ Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
+
+ handler.updateResourceRequests()
+ val container = createContainer("host1", handler.resource)
+ handler.handleAllocatedContainers(Array(container))
+
+ // get amount of memory and vcores from resource, so effectively skipping their validation
+ val expectedResources = Resource.newInstance(handler.resource.getMemory(),
+ handler.resource.getVirtualCores)
+ ResourceRequestHelper.setResourceRequests(Map("gpu" -> "2G"), expectedResources)
+ val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
+
+ verify(mockAmClient).addContainerRequest(captor.capture())
+ val containerRequest: ContainerRequest = captor.getValue
+ assert(containerRequest.getCapability === expectedResources)
+ }
+
test("container should not be created if requested number if met") {
// request a single container and receive it
val handler = createAllocator(1)
diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh
index 38a43b98c399..71dace47767c 100755
--- a/sbin/start-history-server.sh
+++ b/sbin/start-history-server.sh
@@ -28,7 +28,22 @@ if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
+# NOTE: This exact class name is matched downstream by SparkSubmit.
+# Any changes need to be reflected there.
+CLASS="org.apache.spark.deploy.history.HistoryServer"
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+ echo "Usage: ./sbin/start-history-server.sh [options]"
+ pattern="Usage:"
+ pattern+="\|Using Spark's default log4j profile:"
+ pattern+="\|Started daemon with process name"
+ pattern+="\|Registered signal handler for"
+
+ "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+ exit 1
+fi
+
. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"
-exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 "$@"
+exec "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 "$@"
diff --git a/sbin/start-master.sh b/sbin/start-master.sh
index 97ee32159b6d..b6a566e4daf4 100755
--- a/sbin/start-master.sh
+++ b/sbin/start-master.sh
@@ -31,7 +31,8 @@ if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-master.sh [options]"
pattern="Usage:"
pattern+="\|Using Spark's default log4j profile:"
- pattern+="\|Registered signal handlers for"
+ pattern+="\|Started daemon with process name"
+ pattern+="\|Registered signal handler for"
"${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
exit 1
diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh
index 8c268b885915..247c9e20395e 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-slave.sh
@@ -43,7 +43,8 @@ if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-slave.sh [options] Error while rendering execution table:
+ *
* @group expr_ops
* @since 1.4.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 55e538f49fed..5d0feecd2cc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -47,10 +47,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Specifies the behavior when data or table already exists. Options include:
- * - `SaveMode.Overwrite`: overwrite the existing data.
- * - `SaveMode.Append`: append the data.
- * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
- * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
+ *
+ *
*
* @since 1.4.0
*/
@@ -61,10 +63,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Specifies the behavior when data or table already exists. Options include:
- * - `overwrite`: overwrite the existing data.
- * - `append`: append the data.
- * - `ignore`: ignore the operation (i.e. no-op).
- * - `error` or `errorifexists`: default option, throw an exception at runtime.
+ *
+ *
*
* @since 1.4.0
*/
@@ -163,9 +167,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme. As an example, when we
* partition a dataset by year and then month, the directory layout would look like:
- *
- * - year=2016/month=01/
- * - year=2016/month=02/
+ *
+ *
*
* Partitioning is one of the most widely used techniques to optimize physical data layout.
* It provides a coarse-grained index for skipping unnecessary data reads when queries have
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index b21c50af1843..52b8c839643e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -130,8 +130,11 @@ abstract class ForeachWriter[T] extends Serializable {
* Called when stopping to process one partition of new data in the executor side. This is
* guaranteed to be called either `open` returns `true` or `false`. However,
* `close` won't be called in the following cases:
- * - JVM crashes without throwing a `Throwable`
- * - `open` throws a `Throwable`.
+ *
+ *
+ *
*
* @param errorOrNull the error thrown during processing data or null if there was no error.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index f99c108161f9..6b02ac2ded8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -30,12 +30,15 @@ import org.apache.spark.sql.catalyst.rules.Rule
* regarding binary compatibility and source compatibility of methods here.
*
* This current provides the following extension points:
- * - Analyzer Rules.
- * - Check Analysis Rules
- * - Optimizer Rules.
- * - Planning Strategies.
- * - Customized Parser.
- * - (External) Catalog listeners.
+ *
+ *
+ *
*
* The extensions can be used by calling withExtension on the [[SparkSession.Builder]], for
* example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 4cd2e788ade0..09effe087e19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -452,8 +452,15 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
+ val stopCheck = if (parent.needStopCheck) {
+ s"""
+ |if (shouldStop()) {
+ | $nextIndex = $value + ${step}L;
+ | $numOutput.add($localIdx + 1);
+ | $inputMetrics.incRecordsRead($localIdx + 1);
+ | return;
+ |}
+ """.stripMargin
} else {
"// shouldStop check is eliminated"
}
@@ -506,8 +513,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
- | $numOutput.add($nextBatchTodo);
- | $inputMetrics.incRecordsRead($nextBatchTodo);
| $batchEnd += $nextBatchTodo * ${step}L;
| }
|
@@ -515,9 +520,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $nextIndex;
| ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $stopCheck
| }
| $nextIndex = $batchEnd;
+ | $numOutput.add($localEnd);
+ | $inputMetrics.incRecordsRead($localEnd);
| $taskContext.killTaskIfInterrupted();
| }
""".stripMargin
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1a8fbaca53f5..3b6588587c35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -149,30 +149,30 @@ object InMemoryRelation {
tableName: Option[String],
logicalPlan: LogicalPlan): InMemoryRelation = {
val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)()
- new InMemoryRelation(child.output, cacheBuilder)(
- statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering)
+ new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)(
+ statsOfPlanToCache = logicalPlan.stats)
}
def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
- new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder)(
- statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering)
+ new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)(
+ statsOfPlanToCache = logicalPlan.stats)
}
}
case class InMemoryRelation(
output: Seq[Attribute],
- @transient cacheBuilder: CachedRDDBuilder)(
- statsOfPlanToCache: Statistics,
- override val outputOrdering: Seq[SortOrder])
+ @transient cacheBuilder: CachedRDDBuilder,
+ override val outputOrdering: Seq[SortOrder])(
+ statsOfPlanToCache: Statistics)
extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)),
- cacheBuilder)(
- statsOfPlanToCache,
- outputOrdering)
+ cacheBuilder,
+ outputOrdering)(
+ statsOfPlanToCache)
override def producedAttributes: AttributeSet = outputSet
@@ -195,15 +195,15 @@ case class InMemoryRelation(
}
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
- InMemoryRelation(newOutput, cacheBuilder)(statsOfPlanToCache, outputOrdering)
+ InMemoryRelation(newOutput, cacheBuilder, outputOrdering)(statsOfPlanToCache)
}
override def newInstance(): this.type = {
new InMemoryRelation(
output.map(_.newInstance()),
- cacheBuilder)(
- statsOfPlanToCache,
- outputOrdering).asInstanceOf[this.type]
+ cacheBuilder,
+ outputOrdering)(
+ statsOfPlanToCache).asInstanceOf[this.type]
}
override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2eca1c40a5b3..64831e5089a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -306,7 +306,8 @@ case class LoadDataCommand(
val loadPath = {
if (isLocal) {
val localFS = FileContext.getLocalFSFileContext()
- makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path))
+ LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(),
+ new Path(path))
} else {
val loadPath = new Path(path)
// Follow Hive's behavior:
@@ -323,7 +324,7 @@ case class LoadDataCommand(
// by considering the wild card scenario in mind.as per old logic query param is
// been considered while creating URI instance and if path contains wild card char '?'
// the remaining charecters after '?' will be removed while forming URI instance
- makeQualified(defaultFS, uriPath, loadPath)
+ LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
}
}
val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -363,7 +364,9 @@ case class LoadDataCommand(
CommandUtils.updateTableStats(sparkSession, targetTable)
Seq.empty[Row]
}
+}
+object LoadDataCommand {
/**
* Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class.
*
@@ -372,7 +375,7 @@ case class LoadDataCommand(
* @param path Path instance based on the path string specified by the user.
* @return qualified path object
*/
- private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
+ private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri()
if (pathUri.getScheme == null || pathUri.getAuthority == null &&
defaultUri.getAuthority != null) {
@@ -383,7 +386,7 @@ case class LoadDataCommand(
pathUri.getAuthority
}
try {
- val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment)
+ val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment)
new Path(newUri)
} catch {
case e: URISyntaxException =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index dd3c154259c7..ffea33c08ef9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -85,7 +85,7 @@ class FileScanRDD(
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
- private def updateBytesRead(): Unit = {
+ private def incTaskInputMetricsBytesRead(): Unit = {
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}
@@ -106,15 +106,16 @@ class FileScanRDD(
// don't need to run this `if` for every record.
val preNumRecordsRead = inputMetrics.recordsRead
if (nextElement.isInstanceOf[ColumnarBatch]) {
+ incTaskInputMetricsBytesRead()
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
} else {
+ // too costly to update every record
+ if (inputMetrics.recordsRead %
+ SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ incTaskInputMetricsBytesRead()
+ }
inputMetrics.incRecordsRead(1)
}
- // The records may be incremented by more than 1 at a time.
- if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
- inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
- updateBytesRead()
- }
nextElement
}
@@ -201,7 +202,7 @@ class FileScanRDD(
}
override def close(): Unit = {
- updateBytesRead()
+ incTaskInputMetricsBytesRead()
InputFileBlockHolder.unset()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index b912f8add3af..0a7473c491b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -139,23 +139,25 @@ object CSVUtils {
*/
@throws[IllegalArgumentException]
def toChar(str: String): Char = {
- if (str.charAt(0) == '\\') {
- str.charAt(1)
- match {
- case 't' => '\t'
- case 'r' => '\r'
- case 'b' => '\b'
- case 'f' => '\f'
- case '\"' => '\"' // In case user changes quote char and uses \" as delimiter in options
- case '\'' => '\''
- case 'u' if str == """\u0000""" => '\u0000'
- case _ =>
- throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str")
- }
- } else if (str.length == 1) {
- str.charAt(0)
- } else {
- throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
+ (str: Seq[Char]) match {
+ case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string")
+ case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." +
+ " It has special meaning as beginning of an escape sequence." +
+ " To get the backslash character, pass a string with two backslashes as the delimiter.")
+ case Seq(c) => c
+ case Seq('\\', 't') => '\t'
+ case Seq('\\', 'r') => '\r'
+ case Seq('\\', 'b') => '\b'
+ case Seq('\\', 'f') => '\f'
+ // In case user changes quote char and uses \" as delimiter in options
+ case Seq('\\', '\"') => '\"'
+ case Seq('\\', '\'') => '\''
+ case Seq('\\', '\\') => '\\'
+ case _ if str == """\u0000""" => '\u0000'
+ case Seq('\\', _) =>
+ throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str")
+ case _ =>
+ throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index 1b2d8a821b36..1a25cd2a49e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.execution.ui
+import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.commons.lang3.StringEscapeUtils
+import scala.xml.{Node, NodeSeq, Unparsed}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
+import org.apache.spark.util.Utils
private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {
@@ -55,8 +56,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
val _content = mutable.ListBuffer[Node]()
if (running.nonEmpty) {
- val runningPageTable = new RunningExecutionTable(
- parent, currentTime, running.sortBy(_.submissionTime).reverse).toNodeSeq(request)
+ val runningPageTable =
+ executionsTable(request, "running", running, currentTime, true, true, true)
_content ++=
- Running Queries:
+ Running Queries:
{running.size}
}
@@ -129,7 +130,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
{
if (completed.nonEmpty) {
+ {Utils.exceptionString(e)}
+
+
+
+ {header}
+ {Unparsed(arrow)}
+
+
+
} else {
- None
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$executionTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$executionTag.pageSize=$pageSize" +
+ s"#$tableHeaderId")
+
+
+
+ {header}
+
+
+ } else {
+
+ {header}
+
+ }
}
- }.toSeq
+ }
+ }
+
+ {headerRow}
+
+ }
+
+ override def row(executionTableRow: ExecutionTableRowData): Seq[Node] = {
+ val executionUIData = executionTableRow.executionUIData
+ val submissionTime = executionUIData.submissionTime
+ val duration = executionTableRow.duration
+
+ def jobLinks(jobData: Seq[Int]): Seq[Node] = {
+ jobData.map { jobId =>
+ [{jobId.toString}]
+ }
}
@@ -188,7 +371,7 @@ private[ui] abstract class ExecutionTable(
{executionUIData.executionId.toString}
}
- private def descriptionCell(
- request: HttpServletRequest,
- execution: SQLExecutionUIData): Seq[Node] = {
+ private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
val details = if (execution.details != null && execution.details.nonEmpty) {
-
- {descriptionCell(request, executionUIData)}
+ {descriptionCell(executionUIData)}
{UIUtils.formatDate(submissionTime)}
@@ -198,27 +381,26 @@ private[ui] abstract class ExecutionTable(
{if (showRunningJobs) {
- {jobLinks(JobExecutionStatus.RUNNING)}
+ {jobLinks(executionTableRow.runningJobData)}
}}
{if (showSucceededJobs) {
- {jobLinks(JobExecutionStatus.SUCCEEDED)}
+ {jobLinks(executionTableRow.completedJobData)}
}}
{if (showFailedJobs) {
- {jobLinks(JobExecutionStatus.FAILED)}
+ {jobLinks(executionTableRow.failedJobData)}
}}