Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* - This will throw an exception is the config is not optional and the value is not set.
*/
private[spark] def get[T](entry: ConfigEntry[T]): T = {
entry.readFrom(this)
entry.readFrom(settings, getenv)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,25 @@ private[spark] class TypedConfigBuilder[T](

/** Creates a [[ConfigEntry]] that has a default value. */
def createWithDefault(default: T): ConfigEntry[T] = {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
// Treat "String" as a special case, so that both createWithDefault and createWithDefaultString
// behave the same w.r.t. variable expansion of default values.
if (default.isInstanceOf[String]) {
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
}

/**
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val typedDefault = converter(default)
val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
Expand Down
132 changes: 123 additions & 9 deletions core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,35 @@

package org.apache.spark.internal.config

import java.util.{Map => JMap}

import scala.util.matching.Regex

import org.apache.spark.SparkConf

/**
* An entry contains all meta information for a configuration.
*
* Config options created using this feature support variable expansion. If the config value
* contains variable references of the form "${prefix:variableName}", the reference will be replaced
* with the value of the variable depending on the prefix. The prefix can be one of:
*
* - no prefix: if the config key starts with "spark", looks for the value in the Spark config
* - system: looks for the value in the system properties
* - env: looks for the value in the environment
*
* So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
* configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
* environment variable.
*
* For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
* will also consider the default value when it exists.
*
* If the reference cannot be resolved, the original string will be retained.
*
* Variable expansion is also applied to the default values of config entries that have a default
* value declared as a string.
*
* @param key the key for the configuration
* @param defaultValue the default value for the configuration
* @param valueConverter how to convert a string to the value. It should throw an exception if the
Expand All @@ -42,17 +66,27 @@ private[spark] abstract class ConfigEntry[T] (
val doc: String,
val isPublic: Boolean) {

import ConfigEntry._

registerEntry(this)

def defaultValueString: String

def readFrom(conf: SparkConf): T
def readFrom(conf: JMap[String, String], getenv: String => String): T

// This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
// use readFrom().
def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}

protected def readAndExpand(
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String] = Set()): Option[String] = {
Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
}

}

private class ConfigEntryWithDefault[T] (
Expand All @@ -68,12 +102,36 @@ private class ConfigEntryWithDefault[T] (

override def defaultValueString: String = stringConverter(_defaultValue)

override def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
}

}

private class ConfigEntryWithDefaultString[T] (
key: String,
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))

override def defaultValueString: String = _defaultValue

def readFrom(conf: JMap[String, String], getenv: String => String): T = {
Option(conf.get(key))
.orElse(Some(_defaultValue))
.map(ConfigEntry.expand(_, conf, getenv, Set()))
.map(valueConverter)
.get
}

}


/**
* A config entry that does not have a default value.
*/
Expand All @@ -88,7 +146,9 @@ private[spark] class OptionalConfigEntry[T](

override def defaultValueString: String = "<undefined>"

override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
readAndExpand(conf, getenv).map(rawValueConverter)
}

}

Expand All @@ -99,13 +159,67 @@ private class FallbackConfigEntry[T] (
key: String,
doc: String,
isPublic: Boolean,
private val fallback: ConfigEntry[T])
private[config] val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {

override def defaultValueString: String = s"<value of ${fallback.key}>"

override def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
}

}

private object ConfigEntry {

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r

def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}

def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)

/**
* Expand the `value` according to the rules explained in ConfigEntry.
*/
def expand(
value: String,
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String]): String = {
REF_RE.replaceAllIn(value, { m =>
val prefix = m.group(1)
val name = m.group(2)
val replacement = prefix match {
case null =>
require(!usedRefs.contains(name), s"Circular reference in $value: $name")
if (name.startsWith("spark.")) {
Option(findEntry(name))
.flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
.orElse(Option(conf.get(name)))
.orElse(defaultValueString(name))
} else {
None
}
case "system" => sys.props.get(name)
case "env" => Option(getenv(name))
case _ => None
}
Regex.quoteReplacement(replacement.getOrElse(m.matched))
})
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
case _ => None
}
}

}
Loading