From 8669c21e1dc97e660a13b1cad598d1dbe8e44731 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 23 Aug 2018 17:39:07 -0700 Subject: [PATCH 1/8] Consolidated Scala 2.11 and 2.12 branches --- .../org/apache/spark/repl/SparkILoop.scala | 143 ------------------ .../org/apache/spark/repl/SparkILoop.scala | 41 ++++- 2 files changed, 37 insertions(+), 147 deletions(-) delete mode 100644 repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename repl/{scala-2.11 => }/src/main/scala/org/apache/spark/repl/SparkILoop.scala (85%) diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala deleted file mode 100644 index ffb2e5f5db7e2..0000000000000 --- a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io.BufferedReader - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} -import scala.tools.nsc.util.stringFromStream -import scala.util.Properties.{javaVersion, javaVmName, versionString} - -/** - * A Spark-specific interactive shell. - */ -class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) - extends ILoop(in0, out) { - def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - def this() = this(None, new JPrintWriter(Console.out, true)) - - val initializationCommands: Seq[String] = Seq( - """ - @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { - org.apache.spark.repl.Main.sparkSession - } else { - org.apache.spark.repl.Main.createSparkSession() - } - @transient val sc = { - val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { - val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) - if (proxyUrl != null) { - println( - s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") - } else { - println(s"Spark Context Web UI is available at Spark Master Public URL") - } - } else { - _sc.uiWebUrl.foreach { - webUrl => println(s"Spark context Web UI available at ${webUrl}") - } - } - println("Spark context available as 'sc' " + - s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - println("Spark session available as 'spark'.") - _sc - } - """, - "import org.apache.spark.SparkContext._", - "import spark.implicits._", - "import spark.sql", - "import org.apache.spark.sql.functions._" - ) - - def initializeSpark() { - intp.beQuietDuring { - savingReplayStack { // remove the commands from session history. - initializationCommands.foreach(command) - } - } - } - - /** Print a welcome message */ - override def printWelcome() { - import org.apache.spark.SPARK_VERSION - echo("""Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version %s - /_/ - """.format(SPARK_VERSION)) - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - echo("Type in expressions to have them evaluated.") - echo("Type :help for more information.") - } - - /** Available commands */ - override def commands: List[LoopCommand] = standardCommands - - /** - * We override `createInterpreter` because we need to initialize Spark *before* the REPL - * sees any files, so that the Spark context is visible in those files. This is a bit of a - * hack, but there isn't another hook available to us at this point. - */ - override def createInterpreter(): Unit = { - super.createInterpreter() - initializeSpark() - } - - override def resetCommand(line: String): Unit = { - super.resetCommand(line) - initializeSpark() - echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") - } - - override def replay(): Unit = { - initializeSpark() - super.replay() - } - -} - -object SparkILoop { - - /** - * Creates an interpreter loop with default settings and feeds - * the given code to it as input. - */ - def run(code: String, sets: Settings = new Settings): String = { - import java.io.{ BufferedReader, StringReader, OutputStreamWriter } - - stringFromStream { ostream => - Console.withOut(ostream) { - val input = new BufferedReader(new StringReader(code)) - val output = new JPrintWriter(new OutputStreamWriter(ostream), true) - val repl = new SparkILoop(input, output) - - if (sets.classpath.isDefault) { - sets.classpath.value = sys.props("java.class.path") - } - repl process sets - } - } - } - def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString) -} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala similarity index 85% rename from repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename to repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 94265267b1f97..5e8e9ca23e146 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -24,7 +24,6 @@ import scala.Predef.{println => _, _} // scalastyle:on println import scala.concurrent.Future import scala.reflect.classTag -import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader import scala.reflect.io.File import scala.tools.nsc.{GenericRunnerSettings, Properties} import scala.tools.nsc.Settings @@ -33,7 +32,9 @@ import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, JPri import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, SplashReader} import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain import scala.tools.nsc.util.stringFromStream -import scala.util.Properties.{javaVersion, javaVmName, versionString} +import scala.util.Properties.{javaVersion, javaVmName, versionNumberString, versionString} + +import org.apache.spark.util.Utils /** * A Spark-specific interactive shell. @@ -43,10 +44,26 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) + // TODO: Remove the entire override when the support of Scala 2.11 is ended + // Scala 2.11 has a bug of finding imported types in class constructors, extends clause + // which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x + // As a result, we copied the fixes into `SparkILoopInterpreter`. See SPARK-22393 for detail. override def createInterpreter(): Unit = { - intp = new SparkILoopInterpreter(settings, out) + if (isScala2_11) { + if (addedClasspath != "") { + settings.classpath append addedClasspath + } + intp = Utils.classForName("org.apache.spark.repl.SparkILoopInterpreter") + .getDeclaredConstructor(Seq(classOf[Settings], classOf[JPrintWriter]): _*) + .newInstance(Seq(settings, out): _*) + .asInstanceOf[IMain] + } else { + super.createInterpreter() + } } + private val isScala2_11 = versionNumberString.startsWith("2.11") + val initializationCommands: Seq[String] = Seq( """ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { @@ -124,6 +141,22 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) super.replay() } + /** + * TODO: Remove `runClosure` when the support of Scala 2.11 is ended + * In Scala 2.12, we don't need to use `savingContextLoader` to execute the `body`. + * See `SI-8521 No blind save of context class loader` for detail. + */ + private def runClosure(body: () => Boolean): Boolean = { + if (isScala2_11) { + Utils.classForName("scala.reflect.internal.util.ScalaClassLoader$") + .getDeclaredMethod("savingContextLoader", classOf[() => Boolean]) + .invoke(null, body) + .asInstanceOf[Boolean] + } else { + body.apply() + } + } + /** * The following code is mostly a copy of `process` implementation in `ILoop.scala` in Scala * @@ -138,7 +171,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) * We should remove this duplication once Scala provides a way to load our custom initialization * code, and also customize the ordering of printing welcome message. */ - override def process(settings: Settings): Boolean = savingContextLoader { + override def process(settings: Settings): Boolean = runClosure { () => def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) From 3808f02fdc2d914f7a022d00884034d8d8ceb19f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 27 Aug 2018 11:26:29 +0000 Subject: [PATCH 2/8] Get static loader object and invoke method on it. --- repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 5e8e9ca23e146..055317bee1bef 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -148,9 +148,13 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) */ private def runClosure(body: () => Boolean): Boolean = { if (isScala2_11) { + val loader = Utils.classForName("scala.reflect.internal.util.ScalaClassLoader$") + .getDeclaredField("MODULE$") + .get(null) + Utils.classForName("scala.reflect.internal.util.ScalaClassLoader$") .getDeclaredMethod("savingContextLoader", classOf[() => Boolean]) - .invoke(null, body) + .invoke(loader, body) .asInstanceOf[Boolean] } else { body.apply() From 075ca4a0c25503e4df4bc880f6ea58ead2eabcbe Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 27 Aug 2018 10:54:50 -0700 Subject: [PATCH 3/8] Changed message --- repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 055317bee1bef..cd197393261fd 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -44,9 +44,9 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) - // TODO: Remove the entire override when the support of Scala 2.11 is ended + // TODO: Remove the following `override` when the support of Scala 2.11 is ended // Scala 2.11 has a bug of finding imported types in class constructors, extends clause - // which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x + // which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x. // As a result, we copied the fixes into `SparkILoopInterpreter`. See SPARK-22393 for detail. override def createInterpreter(): Unit = { if (isScala2_11) { From 6203f83008950a811b33bba97b99540716d27833 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 27 Aug 2018 11:30:24 -0700 Subject: [PATCH 4/8] remove 2.12 pom hack --- repl/pom.xml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/repl/pom.xml b/repl/pom.xml index 861bbd7c49654..553d5eb79a256 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -167,14 +167,4 @@ - - - scala-2.12 - - scala-2.12/src/main/scala - scala-2.12/src/test/scala - - - - From e0d424d645010108a497c057fa4ad1e198f1e3d0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 28 Aug 2018 02:27:40 +0000 Subject: [PATCH 5/8] Use Spark classloader. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ++++++ repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e6646bd073c6b..6b16adb95f6a1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -240,6 +240,12 @@ private[spark] object Utils extends Logging { // scalastyle:on classforname } + // scalastyle:off classforname + def classForNameFromSpark(className: String): Class[_] = { + Class.forName(className, true, getSparkClassLoader) + // scalastyle:on classforname + } + /** * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] */ diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index cd197393261fd..54175264a3134 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -53,7 +53,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) if (addedClasspath != "") { settings.classpath append addedClasspath } - intp = Utils.classForName("org.apache.spark.repl.SparkILoopInterpreter") + intp = Utils.classForNameFromSpark("org.apache.spark.repl.SparkILoopInterpreter") .getDeclaredConstructor(Seq(classOf[Settings], classOf[JPrintWriter]): _*) .newInstance(Seq(settings, out): _*) .asInstanceOf[IMain] @@ -148,11 +148,11 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) */ private def runClosure(body: () => Boolean): Boolean = { if (isScala2_11) { - val loader = Utils.classForName("scala.reflect.internal.util.ScalaClassLoader$") + val loader = Utils.classForNameFromSpark("scala.reflect.internal.util.ScalaClassLoader$") .getDeclaredField("MODULE$") .get(null) - Utils.classForName("scala.reflect.internal.util.ScalaClassLoader$") + Utils.classForNameFromSpark("scala.reflect.internal.util.ScalaClassLoader$") .getDeclaredMethod("savingContextLoader", classOf[() => Boolean]) .invoke(loader, body) .asInstanceOf[Boolean] From af4d9847bac225008e7359a3ea5064539db612c0 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 28 Aug 2018 13:57:33 -0700 Subject: [PATCH 6/8] Updated --- .../main/scala/org/apache/spark/util/Utils.scala | 6 ------ .../scala/org/apache/spark/repl/SparkILoop.scala | 13 ++++++++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6b16adb95f6a1..e6646bd073c6b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -240,12 +240,6 @@ private[spark] object Utils extends Logging { // scalastyle:on classforname } - // scalastyle:off classforname - def classForNameFromSpark(className: String): Class[_] = { - Class.forName(className, true, getSparkClassLoader) - // scalastyle:on classforname - } - /** * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]] */ diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 54175264a3134..db0a96d7c5fb8 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -53,10 +53,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) if (addedClasspath != "") { settings.classpath append addedClasspath } - intp = Utils.classForNameFromSpark("org.apache.spark.repl.SparkILoopInterpreter") + // scalastyle:off classforname + // Have to use the default classloader to match the one used in + // `classOf[Settings]` and `classOf[JPrintWriter]`. + intp = Class.forName("org.apache.spark.repl.SparkILoopInterpreter") .getDeclaredConstructor(Seq(classOf[Settings], classOf[JPrintWriter]): _*) .newInstance(Seq(settings, out): _*) .asInstanceOf[IMain] + // scalastyle:on classforname } else { super.createInterpreter() } @@ -148,14 +152,17 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) */ private def runClosure(body: () => Boolean): Boolean = { if (isScala2_11) { - val loader = Utils.classForNameFromSpark("scala.reflect.internal.util.ScalaClassLoader$") + // scalastyle:off classforname + // Have to use the default classloader to match the one used in `classOf[() => Boolean]`. + val loader = Class.forName("scala.reflect.internal.util.ScalaClassLoader$") .getDeclaredField("MODULE$") .get(null) - Utils.classForNameFromSpark("scala.reflect.internal.util.ScalaClassLoader$") + Class.forName("scala.reflect.internal.util.ScalaClassLoader$") .getDeclaredMethod("savingContextLoader", classOf[() => Boolean]) .invoke(loader, body) .asInstanceOf[Boolean] + // scalastyle:on classforname } else { body.apply() } From b147dba6327b5d3b495faf0d985f4a97b43d4dd4 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 28 Aug 2018 14:07:00 -0700 Subject: [PATCH 7/8] Remove unused import --- repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index db0a96d7c5fb8..7a3dfd2d2e041 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -34,8 +34,6 @@ import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain import scala.tools.nsc.util.stringFromStream import scala.util.Properties.{javaVersion, javaVmName, versionNumberString, versionString} -import org.apache.spark.util.Utils - /** * A Spark-specific interactive shell. */ From bab5947c3a0396a47b2ca399abea70471f4adbaf Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 28 Aug 2018 16:15:28 -0700 Subject: [PATCH 8/8] Reduce one reflection --- .../org/apache/spark/repl/SparkILoop.scala | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 7a3dfd2d2e041..aa9aa2793b8b3 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -42,10 +42,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) - // TODO: Remove the following `override` when the support of Scala 2.11 is ended - // Scala 2.11 has a bug of finding imported types in class constructors, extends clause - // which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x. - // As a result, we copied the fixes into `SparkILoopInterpreter`. See SPARK-22393 for detail. + /** + * TODO: Remove the following `override` when the support of Scala 2.11 is ended + * Scala 2.11 has a bug of finding imported types in class constructors, extends clause + * which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x. + * As a result, we copied the fixes into `SparkILoopInterpreter`. See SPARK-22393 for detail. + */ override def createInterpreter(): Unit = { if (isScala2_11) { if (addedClasspath != "") { @@ -145,24 +147,21 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) /** * TODO: Remove `runClosure` when the support of Scala 2.11 is ended - * In Scala 2.12, we don't need to use `savingContextLoader` to execute the `body`. - * See `SI-8521 No blind save of context class loader` for detail. */ - private def runClosure(body: () => Boolean): Boolean = { + private def runClosure(body: => Boolean): Boolean = { if (isScala2_11) { - // scalastyle:off classforname - // Have to use the default classloader to match the one used in `classOf[() => Boolean]`. - val loader = Class.forName("scala.reflect.internal.util.ScalaClassLoader$") - .getDeclaredField("MODULE$") - .get(null) - - Class.forName("scala.reflect.internal.util.ScalaClassLoader$") - .getDeclaredMethod("savingContextLoader", classOf[() => Boolean]) - .invoke(loader, body) - .asInstanceOf[Boolean] - // scalastyle:on classforname + // In Scala 2.11, there is a bug that interpret could set the current thread's + // context classloader, but fails to reset it to its previous state when returning + // from that method. This is fixed in SI-8521 https://github.com/scala/scala/pull/5657 + // which is never back-ported into Scala 2.11.x. The following is a workaround fix. + val original = Thread.currentThread().getContextClassLoader + try { + body + } finally { + Thread.currentThread().setContextClassLoader(original) + } } else { - body.apply() + body } } @@ -180,7 +179,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) * We should remove this duplication once Scala provides a way to load our custom initialization * code, and also customize the ordering of printing welcome message. */ - override def process(settings: Settings): Boolean = runClosure { () => + override def process(settings: Settings): Boolean = runClosure { def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))