From 89d9b86616196fde5d0b3a08fb284e6af6afe588 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 18 Jul 2017 14:41:24 +0800 Subject: [PATCH 01/13] HiveConf in SparkSQLCLIDriver doesn't respect spark.hadoop.some.hive.variables --- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 7 +++++++ .../scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a29d7a7565ee..2c2627a1609d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -404,6 +404,13 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + sys.props.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + propMap.put(key.substring("spark.hadoop.".length), value) + } + } + propMap.toMap } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 667a7ddd8bb6..8bb7ea027d35 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "") } } + + test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") { + sys.props.put("spark.hadoop.foo", "bar") + Seq(true, false) foreach { useInMemoryDerby => + val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby) + intercept[NoSuchElementException](hiveConf("spark.hadoop.foo") === "bar") + assert(hiveConf("foo") === "bar") + } + } } From 1ac4cb97307fa8f647c46dd6476cd7d50369b692 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Mon, 31 Jul 2017 09:47:10 +0800 Subject: [PATCH 02/13] nit: code review --- .../test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 8bb7ea027d35..2ebb1de428fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -38,7 +38,7 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton sys.props.put("spark.hadoop.foo", "bar") Seq(true, false) foreach { useInMemoryDerby => val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby) - intercept[NoSuchElementException](hiveConf("spark.hadoop.foo") === "bar") + assert(!hiveConf.contains("spark.hadoop.foo")) assert(hiveConf("foo") === "bar") } } From 9388f5e2a9003d696a846549c9d6417120f7a1d4 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Mon, 31 Jul 2017 22:14:54 +0800 Subject: [PATCH 03/13] apply --hiveconf to hadoopconf and add ut in cliSuite --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 13 ++++++++++--- .../spark/sql/hive/thriftserver/CliSuite.scala | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 33e18a8da60f..839c1cd668ec 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { private val prompt = "spark-sql" private val continuedPrompt = "".padTo(prompt.length, ' ') private var transport: TSocket = _ + private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop." installSignalHandler() @@ -134,6 +135,14 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Hive 1.2 + not supported in CLI throw new RuntimeException("Remote operations not supported") } + // Respect the configurations set by --hiveconf from the command line + // (based on Hive's CliDriver). + val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala + hiveConfFromCmd.foreach { kv => + // If the same property is configured by spark.hadoop.xxx, we ignore it and + // obey settings from spark properties + sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + kv.getKey, kv.getValue) + } val cli = new SparkSQLCLIDriver cli.setHiveVariables(oproc.getHiveVariables) @@ -159,9 +168,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Respect the configurations set by --hiveconf from the command line // (based on Hive's CliDriver). - val it = sessionState.getOverriddenConfigurations.entrySet().iterator() - while (it.hasNext) { - val kv = it.next() + hiveConfFromCmd.foreach{ kv => SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index d3cec11bd756..2c8f440a648b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -283,4 +283,19 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SET conf3;" -> "conftest" ) } + + test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") { + runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath) + } + + test("SPARK-21451: Apply spark.hadoop.* configurations") { + val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451") + runCliWithin( + 1.minute, + Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))( + "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath) + + tmpDir.delete() + } + } From 9ba8f53137e51bb1ad1a811e30820af97f9f9c03 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Mon, 31 Jul 2017 22:50:23 +0800 Subject: [PATCH 04/13] style check failed --- .../scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 2c8f440a648b..933fd7369380 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -294,8 +294,6 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { 1.minute, Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))( "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath) - tmpDir.delete() } - } From a0329cac9e1f02484924e3ca48a89b04c9dde996 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Wed, 2 Aug 2017 18:41:05 +0800 Subject: [PATCH 05/13] add doc --- docs/configuration.md | 56 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 91b5befd1b1e..5371e7a872ed 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2335,5 +2335,59 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark Application interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's ClassPath. + +In most cases, you may have more than one applications running and rely on some different Hadoop/Hive +client side configurations. You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, +`hive-site.xml` in Spark's ClassPath for each application, but it is not very convenient and these +files are best to be shared with common properties to avoid hard-coding certain configurations. + +The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. +They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` + +In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For +instance. Spark allows you to simply create an empty conf and set spark/spark hadoop properties. + +{% highlight scala %} +val conf = new SparkConf().set("spark.hadoop.abc.def","xyz") +val sc = new SparkContext(conf) +{% endhighlight %} + +Also, you can modify or add configurations at runtime: +{% highlight bash %} +./bin/spark-submit \ + --name "My app" \ + --master local[4] \ + --conf spark.eventLog.enabled=false \ + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ + --conf spark.hadoop.abc.def=xyz + myApp.jar +{% endhighlight %} + +## Typical Hadoop/Hive Configurations + + + + + + + + + + + + + +
spark.hadoop.
mapreduce.fileoutputcommitter.algorithm.version
1 + The file output committer algorithm version, valid algorithm version number: 1 or 2. + Version 2 may have better performance, but version 1 may handle failures better in certain situations, + as per MAPREDUCE-4815. +
spark.hadoop.
fs.hdfs.impl.disable.cache
false + Don't cache 'hdfs' filesystem instances. Set true if HDFS Token Expiry in long-running spark applicaitons.HDFS-9276. +
\ No newline at end of file From 6dba90a426b15e92da2f29eae61f76eb1a0ba011 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Thu, 3 Aug 2017 10:09:41 +0800 Subject: [PATCH 06/13] keep consistence in sqlconf --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 839c1cd668ec..ce65a2bd4eeb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -138,10 +138,12 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Respect the configurations set by --hiveconf from the command line // (based on Hive's CliDriver). val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala - hiveConfFromCmd.foreach { kv => + val newHiveConf = hiveConfFromCmd.map { kv => // If the same property is configured by spark.hadoop.xxx, we ignore it and // obey settings from spark properties - sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + kv.getKey, kv.getValue) + val k = kv.getKey + val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + kv.getKey, kv.getValue) + (k, v) } val cli = new SparkSQLCLIDriver @@ -166,10 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) - // Respect the configurations set by --hiveconf from the command line - // (based on Hive's CliDriver). - hiveConfFromCmd.foreach{ kv => - SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) + newHiveConf.foreach{ kv => + SparkSQLEnv.sqlContext.setConf(kv._1, kv._2) } if (sessionState.execString != null) { From 10d624cdeed4b14668e042bc5ad2591eea9817bb Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Thu, 3 Aug 2017 10:11:12 +0800 Subject: [PATCH 07/13] typo --- .../apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index ce65a2bd4eeb..88c36dd2091d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -142,7 +142,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { // If the same property is configured by spark.hadoop.xxx, we ignore it and // obey settings from spark properties val k = kv.getKey - val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + kv.getKey, kv.getValue) + val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue) (k, v) } From c629cc49b6af146860b3d7cecdbe4760f347e8c8 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Thu, 3 Aug 2017 15:49:38 +0800 Subject: [PATCH 08/13] doc review --- docs/configuration.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 5371e7a872ed..b01b3e9aa90c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2326,7 +2326,7 @@ from this directory. # Inheriting Hadoop Cluster Configuration If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that -should be included on Spark's classpath: +should be included on Spark's class path: * `hdfs-site.xml`, which provides default behaviors for the HDFS client. * `core-site.xml`, which sets the default filesystem name. @@ -2340,19 +2340,19 @@ to a location containing the configuration files. # Custom Hadoop/Hive Configuration -If your Spark Application interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive -configuration files in Spark's ClassPath. +If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's class path. -In most cases, you may have more than one applications running and rely on some different Hadoop/Hive -client side configurations. You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, -`hive-site.xml` in Spark's ClassPath for each application, but it is not very convenient and these +Multiple running applications might require different Hadoop/Hive client side configurations. +You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in +Spark's class path for each application, but it is not very convenient and these files are best to be shared with common properties to avoid hard-coding certain configurations. The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For -instance. Spark allows you to simply create an empty conf and set spark/spark hadoop properties. +instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties. {% highlight scala %} val conf = new SparkConf().set("spark.hadoop.abc.def","xyz") @@ -2366,7 +2366,7 @@ Also, you can modify or add configurations at runtime: --master local[4] \ --conf spark.eventLog.enabled=false \ --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ - --conf spark.hadoop.abc.def=xyz + --conf spark.hadoop.abc.def=xyz \ myApp.jar {% endhighlight %} @@ -2387,7 +2387,9 @@ Also, you can modify or add configurations at runtime: spark.hadoop.
fs.hdfs.impl.disable.cache
false - Don't cache 'hdfs' filesystem instances. Set true if HDFS Token Expiry in long-running spark applicaitons.HDFS-9276. + When true, return a fresh HDFS filesystem instance, bypassing the HDFS cache mechanism. + This is to prevent the DFSClient from using an old cached token to connect to the NameNode, + which might fails long-running Spark applications. see HDFS-9276. \ No newline at end of file From 5043eb69b41d1d0263e8814da27a934491bc936c Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Fri, 4 Aug 2017 09:59:34 +0800 Subject: [PATCH 09/13] doc review again --- docs/configuration.md | 36 ++++++------------------------------ 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index b01b3e9aa90c..f019f849c20a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2326,7 +2326,7 @@ from this directory. # Inheriting Hadoop Cluster Configuration If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that -should be included on Spark's class path: +should be included on Spark's classpath: * `hdfs-site.xml`, which provides default behaviors for the HDFS client. * `core-site.xml`, which sets the default filesystem name. @@ -2340,13 +2340,13 @@ to a location containing the configuration files. # Custom Hadoop/Hive Configuration -If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive -configuration files in Spark's class path. +If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's classpath. Multiple running applications might require different Hadoop/Hive client side configurations. You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in -Spark's class path for each application, but it is not very convenient and these -files are best to be shared with common properties to avoid hard-coding certain configurations. +Spark's classpath for each application. In a Spark cluster running on YARN, these configuration +files are set cluster-wide, and cannot safely be changed by the application. The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` @@ -2368,28 +2368,4 @@ Also, you can modify or add configurations at runtime: --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ --conf spark.hadoop.abc.def=xyz \ myApp.jar -{% endhighlight %} - -## Typical Hadoop/Hive Configurations - - - - - - - - - - - - - -
spark.hadoop.
mapreduce.fileoutputcommitter.algorithm.version
1 - The file output committer algorithm version, valid algorithm version number: 1 or 2. - Version 2 may have better performance, but version 1 may handle failures better in certain situations, - as per MAPREDUCE-4815. -
spark.hadoop.
fs.hdfs.impl.disable.cache
false - When true, return a fresh HDFS filesystem instance, bypassing the HDFS cache mechanism. - This is to prevent the DFSClient from using an old cached token to connect to the NameNode, - which might fails long-running Spark applications. see HDFS-9276. -
\ No newline at end of file +{% endhighlight %} \ No newline at end of file From ee47742666a5e2d14fa4fd3e89df924542737305 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Fri, 4 Aug 2017 15:30:00 +0800 Subject: [PATCH 10/13] util method --- .../apache/spark/deploy/SparkHadoopUtil.scala | 26 ++++++++++++++----- .../hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 8 ++---- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff522..47c53ebd2146 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,6 +24,7 @@ import java.util.{Arrays, Comparator, Date, Locale} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.HashMap import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -99,17 +100,30 @@ class SparkHadoopUtil extends Logging { hadoopConf.set("fs.s3a.session.token", sessionToken) } } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } + appendSparkHadoopConfigs(conf, hadoopConf) val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } } + def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + } + + def appendSparkHadoopConfigs(propMap: HashMap[String, String]): Unit = { + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + sys.props.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + propMap.put(key.substring("spark.hadoop.".length), value) + } + } + } + /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 88c36dd2091d..1d815c7b1752 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -168,7 +168,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) - newHiveConf.foreach{ kv => + newHiveConf.foreach { kv => SparkSQLEnv.sqlContext.setConf(kv._1, kv._2) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 2c2627a1609d..b88d7b6d4ec7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -404,12 +405,7 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - sys.props.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - propMap.put(key.substring("spark.hadoop.".length), value) - } - } + SparkHadoopUtil.get.appendSparkHadoopConfigs(propMap) propMap.toMap } From a2b23f3f7206458b7e61c92db85ead9c6f03f6ef Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Fri, 4 Aug 2017 15:39:42 +0800 Subject: [PATCH 11/13] style --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index b88d7b6d4ec7..ce2fcdd30cf5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -35,8 +35,8 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable From 55729fa343e285f3711154dc0adcb4585a5e6f1f Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Sat, 5 Aug 2017 11:30:12 +0800 Subject: [PATCH 12/13] review, comments --- .../apache/spark/deploy/SparkHadoopUtil.scala | 29 +++++++++++-------- .../org/apache/spark/sql/hive/HiveUtils.scala | 7 ++++- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 47c53ebd2146..a0826ee463f4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,6 +22,7 @@ import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} +import scala.collection.immutable.Map import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.HashMap @@ -75,7 +76,6 @@ class SparkHadoopUtil extends Logging { } } - /** * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop * configuration. @@ -106,21 +106,26 @@ class SparkHadoopUtil extends Logging { } } + /** + * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop + * configuration without the spark.hadoop. prefix. + */ def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } + // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + hadoopConf.set(key.substring("spark.hadoop.".length), value) } } - def appendSparkHadoopConfigs(propMap: HashMap[String, String]): Unit = { - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - sys.props.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - propMap.put(key.substring("spark.hadoop.".length), value) - } + /** + * Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix. + */ + def appendSparkHadoopConfigs( + srcMap: Map[String, String], + destMap: HashMap[String, String]): Unit = { + // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar" + srcMap.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + destMap.put(key.substring("spark.hadoop.".length), value) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index ce2fcdd30cf5..13d592f71148 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -405,7 +405,12 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") - SparkHadoopUtil.get.appendSparkHadoopConfigs(propMap) + // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a + // Hadoop Configuration internally, as long as it happens after SparkContext initialized. + // Some instances such as `CliSessionState` used in `SparkSQLCliDriver` may also rely on these + // Configuration. But it happens before SparkContext initialized, we need to take them from + // system properties in the form of regular hadoop configurations. + SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap) propMap.toMap } From 46a955d7d4a172dde53a504b70947bedce8c22d5 Mon Sep 17 00:00:00 2001 From: hzyaoqin Date: Sat, 5 Aug 2017 15:13:53 +0800 Subject: [PATCH 13/13] review, fix ut --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index a0826ee463f4..c620e94ef079 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -112,7 +112,7 @@ class SparkHadoopUtil extends Logging { */ def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } } @@ -124,7 +124,7 @@ class SparkHadoopUtil extends Logging { srcMap: Map[String, String], destMap: HashMap[String, String]): Unit = { // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar" - srcMap.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) { destMap.put(key.substring("spark.hadoop.".length), value) } }