diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff522..c620e94ef079 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} +import scala.collection.immutable.Map import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.HashMap import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -74,7 +76,6 @@ class SparkHadoopUtil extends Logging { } } - /** * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop * configuration. @@ -99,17 +100,35 @@ class SparkHadoopUtil extends Logging { hadoopConf.set("fs.s3a.session.token", sessionToken) } } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } + appendSparkHadoopConfigs(conf, hadoopConf) val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } } + /** + * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop + * configuration without the spark.hadoop. prefix. + */ + def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" + for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + + /** + * Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix. + */ + def appendSparkHadoopConfigs( + srcMap: Map[String, String], + destMap: HashMap[String, String]): Unit = { + // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar" + for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) { + destMap.put(key.substring("spark.hadoop.".length), value) + } + } + /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. diff --git a/docs/configuration.md b/docs/configuration.md index 91b5befd1b1e..f019f849c20a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2335,5 +2335,37 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's classpath. + +Multiple running applications might require different Hadoop/Hive client side configurations. +You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in +Spark's classpath for each application. In a Spark cluster running on YARN, these configuration +files are set cluster-wide, and cannot safely be changed by the application. + +The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. +They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` + +In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For +instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties. + +{% highlight scala %} +val conf = new SparkConf().set("spark.hadoop.abc.def","xyz") +val sc = new SparkContext(conf) +{% endhighlight %} + +Also, you can modify or add configurations at runtime: +{% highlight bash %} +./bin/spark-submit \ + --name "My app" \ + --master local[4] \ + --conf spark.eventLog.enabled=false \ + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ + --conf spark.hadoop.abc.def=xyz \ + myApp.jar +{% endhighlight %} \ No newline at end of file diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 33e18a8da60f..1d815c7b1752 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { private val prompt = "spark-sql" private val continuedPrompt = "".padTo(prompt.length, ' ') private var transport: TSocket = _ + private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop." installSignalHandler() @@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Hive 1.2 + not supported in CLI throw new RuntimeException("Remote operations not supported") } + // Respect the configurations set by --hiveconf from the command line + // (based on Hive's CliDriver). + val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala + val newHiveConf = hiveConfFromCmd.map { kv => + // If the same property is configured by spark.hadoop.xxx, we ignore it and + // obey settings from spark properties + val k = kv.getKey + val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue) + (k, v) + } val cli = new SparkSQLCLIDriver cli.setHiveVariables(oproc.getHiveVariables) @@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) - // Respect the configurations set by --hiveconf from the command line - // (based on Hive's CliDriver). - val it = sessionState.getOverriddenConfigurations.entrySet().iterator() - while (it.hasNext) { - val kv = it.next() - SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) + newHiveConf.foreach { kv => + SparkSQLEnv.sqlContext.setConf(kv._1, kv._2) } if (sessionState.execString != null) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index d3cec11bd756..933fd7369380 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SET conf3;" -> "conftest" ) } + + test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") { + runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath) + } + + test("SPARK-21451: Apply spark.hadoop.* configurations") { + val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451") + runCliWithin( + 1.minute, + Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))( + "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath) + tmpDir.delete() + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a29d7a7565ee..13d592f71148 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -404,6 +405,13 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") + // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a + // Hadoop Configuration internally, as long as it happens after SparkContext initialized. + // Some instances such as `CliSessionState` used in `SparkSQLCliDriver` may also rely on these + // Configuration. But it happens before SparkContext initialized, we need to take them from + // system properties in the form of regular hadoop configurations. + SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap) + propMap.toMap } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 667a7ddd8bb6..2ebb1de428fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "") } } + + test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") { + sys.props.put("spark.hadoop.foo", "bar") + Seq(true, false) foreach { useInMemoryDerby => + val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby) + assert(!hiveConf.contains("spark.hadoop.foo")) + assert(hiveConf("foo") === "bar") + } + } }