From 4402fc7ae33cf3bd1d5c2c8f2dcf380a90615f7c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 13 Dec 2017 18:49:28 -0800 Subject: [PATCH] [SPARK-22779][sql] Resolve default values for fallback configs. SQLConf allows some callers to define a custom default value for configs, and that complicates a little bit the handling of fallback config entries, since most of the default value resolution is hidden by the config code. This change peaks into the internals of these fallback configs to figure out the correct default value, and also returns the current human-readable default when showing the default value (e.g. through "set -v"). --- .../spark/internal/config/ConfigEntry.scala | 8 +++-- .../apache/spark/sql/internal/SQLConf.scala | 16 +++++++--- .../spark/sql/internal/SQLConfSuite.scala | 30 +++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index f1190289244e9..ede3ace4f9aac 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -139,7 +139,7 @@ private[spark] class OptionalConfigEntry[T]( s => Some(rawValueConverter(s)), v => v.map(rawStringConverter).orNull, doc, isPublic) { - override def defaultValueString: String = "" + override def defaultValueString: String = ConfigEntry.UNDEFINED override def readFrom(reader: ConfigReader): Option[T] = { readString(reader).map(rawValueConverter) @@ -149,12 +149,12 @@ private[spark] class OptionalConfigEntry[T]( /** * A config entry whose default value is defined by another config entry. */ -private class FallbackConfigEntry[T] ( +private[spark] class FallbackConfigEntry[T] ( key: String, alternatives: List[String], doc: String, isPublic: Boolean, - private[config] val fallback: ConfigEntry[T]) + val fallback: ConfigEntry[T]) extends ConfigEntry[T](key, alternatives, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { @@ -167,6 +167,8 @@ private class FallbackConfigEntry[T] ( private[spark] object ConfigEntry { + val UNDEFINED = "" + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() def registerEntry(entry: ConfigEntry[_]): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1121444cc938a..cf7e3ebce7411 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1379,7 +1379,7 @@ class SQLConf extends Serializable with Logging { Option(settings.get(key)). orElse { // Try to use the default value - Option(sqlConfEntries.get(key)).map(_.defaultValueString) + Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) } }. getOrElse(throw new NoSuchElementException(key)) } @@ -1417,14 +1417,21 @@ class SQLConf extends Serializable with Logging { * not set yet, return `defaultValue`. */ def getConfString(key: String, defaultValue: String): String = { - if (defaultValue != null && defaultValue != "") { + if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) { val entry = sqlConfEntries.get(key) if (entry != null) { // Only verify configs in the SQLConf object entry.valueConverter(defaultValue) } } - Option(settings.get(key)).getOrElse(defaultValue) + Option(settings.get(key)).getOrElse { + // If the key is not set, need to check whether the config entry is registered and is + // a fallback conf, so that we can check its parent. + sqlConfEntries.get(key) match { + case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue) + case _ => defaultValue + } + } } /** @@ -1440,7 +1447,8 @@ class SQLConf extends Serializable with Logging { */ def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => - (entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc) + val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString) + (entry.key, displayValue, entry.doc) }.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 8b1521bacea49..c9a6975da6be8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -280,4 +280,34 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.clear() } + + test("SPARK-22779: correctly compute default value for fallback configs") { + val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779") + .fallbackConf(SQLConf.PARQUET_COMPRESSION) + + assert(spark.sessionState.conf.getConfString(fallback.key) === + SQLConf.PARQUET_COMPRESSION.defaultValue.get) + assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo") + + val displayValue = spark.sessionState.conf.getAllDefinedConfs + .find { case (key, _, _) => key == fallback.key } + .map { case (_, v, _) => v } + .get + assert(displayValue === fallback.defaultValueString) + + spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip") + assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip") + + spark.sessionState.conf.setConf(fallback, "lzo") + assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo") + + val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs + .find { case (key, _, _) => key == fallback.key } + .map { case (_, v, _) => v } + .get + assert(newDisplayValue === "lzo") + + SQLConf.unregister(fallback) + } + }