diff --git a/.travis.yml b/.travis.yml index eec3b7cb2c1..4554c2c108c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,9 +33,13 @@ addons: matrix: include: - # Test all modules + # Test all modules with scala 2.10 - jdk: "oraclejdk7" - env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Dscala-2.10 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" + + # Test all modules with scala 2.11 + - jdk: "oraclejdk7" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Dscala-2.11 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/README.md b/README.md index eee427cee3a..a1bfaca24b2 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,14 @@ And browse [localhost:8080](localhost:8080) in your browser. For configuration details check __`./conf`__ subdirectory. +### Building for Scala 2.11 + +To produce a Zeppelin package compiled with Scala 2.11, use the -Pscala-2.11 profile: + +``` +mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark -Pscala-2.11 -DskipTests clean install +``` + ### Package To package the final distribution including the compressed archive, run: diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 5ca86709791..dc071e3f86a 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -38,14 +38,11 @@ 3.0.1 1.0.5.4 1.3.0 - 2.10.4 - 2.10 3.3.2 1.7.1 16.0.1 - 2.2.4 4.12 3.2.4-Zeppelin 1.7.0 @@ -173,6 +170,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile diff --git a/flink/pom.xml b/flink/pom.xml index 226dc12a100..7355141fa66 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -37,8 +37,6 @@ 1.0.3 2.3.7 - 2.10 - 2.10.4 2.0.1 @@ -73,68 +71,71 @@ org.apache.flink - flink-clients_${flink.scala.binary.version} + flink-clients_${scala.binary.version} ${flink.version} org.apache.flink - flink-runtime_${flink.scala.binary.version} + flink-runtime_${scala.binary.version} ${flink.version} org.apache.flink - flink-scala_${flink.scala.binary.version} + flink-scala_${scala.binary.version} ${flink.version} org.apache.flink - flink-scala-shell_${flink.scala.binary.version} + flink-scala-shell_${scala.binary.version} ${flink.version} com.typesafe.akka - akka-actor_${flink.scala.binary.version} + akka-actor_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-remote_${flink.scala.binary.version} + akka-remote_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-slf4j_${flink.scala.binary.version} + akka-slf4j_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-testkit_${flink.scala.binary.version} + akka-testkit_${scala.binary.version} ${flink.akka.version} org.scala-lang scala-library - ${flink.scala.version} + ${scala.version} + provided org.scala-lang scala-compiler - ${flink.scala.version} + ${scala.version} + provided org.scala-lang scala-reflect - ${flink.scala.version} + ${scala.version} + provided @@ -169,7 +170,7 @@ net.alchim31.maven scala-maven-plugin - 3.1.4 + 3.2.2 @@ -199,7 +200,7 @@ org.scalamacros - paradise_${flink.scala.version} + paradise_${scala.version} ${scala.macros.version} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 68591d79754..d3229cf09a8 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,6 +17,7 @@ */ package org.apache.zeppelin.flink; +import java.lang.reflect.InvocationTargetException; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; @@ -24,10 +25,7 @@ import java.io.PrintWriter; import java.net.URL; import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; @@ -45,6 +43,8 @@ import scala.Console; import scala.None; import scala.Some; +import scala.collection.JavaConversions; +import scala.collection.immutable.Nil; import scala.runtime.AbstractFunction0; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.IMain; @@ -94,7 +94,7 @@ public void open() { // prepare bindings imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); + Map binder = (Map) getLastObject(); // import libraries imain.interpret("import scala.tools.nsc.io._"); @@ -103,7 +103,10 @@ public void open() { imain.interpret("import org.apache.flink.api.scala._"); imain.interpret("import org.apache.flink.api.common.functions._"); - imain.bindValue("env", env); + + binder.put("env", env); + imain.interpret("val env = _binder.get(\"env\").asInstanceOf[" + + env.getClass().getName() + "]"); } private boolean localMode() { @@ -192,16 +195,11 @@ private List classPath(ClassLoader cl) { return paths; } - public Object getValue(String name) { - IMain imain = flinkIloop.intp(); - Object ret = imain.valueOfTerm(name); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); - } else { - return ret; - } + public Object getLastObject() { + Object obj = imain.lastRequest().lineRep().call( + "$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; } @Override diff --git a/ignite/pom.xml b/ignite/pom.xml index a3312a65953..66e6765aa7e 100644 --- a/ignite/pom.xml +++ b/ignite/pom.xml @@ -33,9 +33,7 @@ http://zeppelin.apache.org - 1.6.0 - 2.10 - 2.10.4 + 1.5.0.final @@ -73,19 +71,19 @@ org.scala-lang scala-library - ${ignite.scala.version} + ${scala.version} org.scala-lang scala-compiler - ${ignite.scala.version} + ${scala.version} org.scala-lang scala-reflect - ${ignite.scala.version} + ${scala.version} diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java index fa5a079ae19..3f240183567 100644 --- a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java @@ -44,6 +44,7 @@ import scala.Console; import scala.None; import scala.Some; +import scala.collection.JavaConversions; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.IMain; import scala.tools.nsc.interpreter.Results.Result; @@ -174,16 +175,11 @@ private List classPath(ClassLoader cl) { return paths; } - public Object getValue(String name) { - Object val = imain.valueOfTerm(name); - - if (val instanceof None) { - return null; - } else if (val instanceof Some) { - return ((Some) val).get(); - } else { - return val; - } + public Object getLastObject() { + Object obj = imain.lastRequest().lineRep().call( + "$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; } private Ignite getIgnite() { @@ -222,7 +218,7 @@ private Ignite getIgnite() { private void initIgnite() { imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map binder = (Map) getValue("_binder"); + Map binder = (Map) getLastObject(); if (getIgnite() != null) { binder.put("ignite", ignite); diff --git a/pom.xml b/pom.xml index 607fce91874..2058cfa8090 100755 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -79,6 +79,11 @@ + 2.10.5 + 2.10 + 2.2.4 + 1.12.5 + 1.7.10 1.2.17 0.9.2 @@ -93,7 +98,6 @@ - org.slf4j slf4j-api @@ -136,7 +140,6 @@ 2.5 - com.google.code.gson gson @@ -155,14 +158,12 @@ 1.5 - commons-io commons-io 2.4 - commons-collections commons-collections @@ -181,7 +182,6 @@ ${guava.version} - junit junit @@ -388,12 +388,25 @@ - + @@ -412,7 +425,7 @@ .github/* .gitignore .repository/ - .Rhistory + .Rhistory **/*.diff **/*.patch **/*.avsc @@ -635,6 +648,28 @@ + + scala-2.10 + + !scala-2.11 + + + 2.10.5 + 2.10 + + + + + scala-2.11 + + scala-2.11 + + + 2.11.7 + 2.11 + + + vendor-repo @@ -710,7 +745,6 @@ false - diff --git a/r/pom.xml b/r/pom.xml index e6c0403381d..b14424b0d21 100644 --- a/r/pom.xml +++ b/r/pom.xml @@ -36,8 +36,6 @@ .sh / 1.4.1 - 2.10.4 - 2.10 @@ -118,13 +116,13 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.4 + ${scalatest.version} test org.scalacheck scalacheck_${scala.binary.version} - 1.12.5 + ${scalacheck.version} test @@ -376,4 +374,31 @@ + + + + + scala-2.10 + + !scala-2.11 + + + 1.6.1 + src/main/scala-2.10 + src/test/scala-2.10 + + + + + scala-2.11 + + scala-2.11 + + + 1.6.1 + src/main/scala-2.11 + src/test/scala/scala-2.11 + + + diff --git a/scalding/pom.xml b/scalding/pom.xml index e287f7b5c39..c561732551d 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -34,7 +34,6 @@ http://zeppelin.apache.org - 2.11.8 2.6.0 0.16.1-RC1 @@ -74,43 +73,43 @@ com.twitter - scalding-core_2.11 + scalding-core_${scala.binary.version} ${scalding.version} com.twitter - scalding-args_2.11 + scalding-args_${scala.binary.version} ${scalding.version} com.twitter - scalding-date_2.11 + scalding-date_${scala.binary.version} ${scalding.version} com.twitter - scalding-commons_2.11 + scalding-commons_${scala.binary.version} ${scalding.version} com.twitter - scalding-avro_2.11 + scalding-avro_${scala.binary.version} ${scalding.version} com.twitter - scalding-parquet_2.11 + scalding-parquet_${scala.binary.version} ${scalding.version} com.twitter - scalding-repl_2.11 + scalding-repl_${scala.binary.version} ${scalding.version} @@ -199,6 +198,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml index cb101b64df1..c2edf68b347 100644 --- a/spark-dependencies/pom.xml +++ b/spark-dependencies/pom.xml @@ -37,8 +37,6 @@ 1.4.1 - 2.10.4 - 2.10 2.3.0 ${hadoop.version} @@ -346,6 +344,14 @@ + + scala-2.11 + + 1.6.1 + http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + + + spark-1.1 diff --git a/spark/pom.xml b/spark/pom.xml index 3a8a0feb530..712691e923e 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -39,8 +39,6 @@ 1.10.19 1.6.4 1.6.2 - 2.10.4 - 2.10 @@ -54,11 +52,11 @@ slf4j-log4j12 - + ${project.groupId} @@ -250,7 +248,7 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.4 + ${scalatest.version} test @@ -412,6 +410,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile @@ -440,7 +439,6 @@ - sparkr diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java index 28c588551f4..5dc5d03d6d3 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java @@ -21,21 +21,22 @@ import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; -import java.lang.reflect.Type; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import org.apache.spark.repl.SparkILoop; -import org.apache.spark.repl.SparkIMain; -import org.apache.spark.repl.SparkJLineCompletion; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.WrappedInterpreter; @@ -51,9 +52,12 @@ import scala.None; import scala.Some; import scala.collection.convert.WrapAsJava$; +import scala.collection.JavaConversions; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.Completion.Candidates; import scala.tools.nsc.interpreter.Completion.ScalaCompleter; +import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -64,10 +68,17 @@ * */ public class DepInterpreter extends Interpreter { - private SparkIMain intp; + /** + * intp - org.apache.spark.repl.SparkIMain (scala 2.10) + * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) + */ + private Object intp; private ByteArrayOutputStream out; private SparkDependencyContext depc; - private SparkJLineCompletion completor; + /** + * completor - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) + */ + private Object completor; private SparkILoop interpreter; static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class); @@ -103,7 +114,7 @@ public static String getSystemDefault( @Override public void close() { if (intp != null) { - intp.close(); + Utils.invokeMethod(intp, "close"); } } @@ -149,31 +160,53 @@ private void createIMain() { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - interpreter = new SparkILoop(null, new PrintWriter(out)); + interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); interpreter.settings_$eq(settings); interpreter.createInterpreter(); - intp = interpreter.intp(); - intp.setContextClassLoader(); - intp.initializeSynchronous(); + intp = Utils.invokeMethod(interpreter, "intp"); + + if (Utils.isScala2_10()) { + Utils.invokeMethod(intp, "setContextClassLoader"); + Utils.invokeMethod(intp, "initializeSynchronous"); + } depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"), getProperty("zeppelin.dep.additionalRemoteRepository")); - completor = new SparkJLineCompletion(intp); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map binder = (Map) getValue("_binder"); + if (Utils.isScala2_10()) { + completor = Utils.instantiateClass( + "org.apache.spark.repl.SparkJLineCompletion", + new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")}, + new Object[]{intp}); + } + interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + Map binder; + if (Utils.isScala2_10()) { + binder = (Map) getValue("_binder"); + } else { + binder = (Map) getLastObject(); + } binder.put("depc", depc); - intp.interpret("@transient val z = " + interpret("@transient val z = " + "_binder.get(\"depc\")" + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]"); } + private Results.Result interpret(String line) { + return (Results.Result) Utils.invokeMethod( + intp, + "interpret", + new Class[] {String.class}, + new Object[] {line}); + } + public Object getValue(String name) { - Object ret = intp.valueOfTerm(name); + Object ret = Utils.invokeMethod( + intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); if (ret instanceof None) { return null; } else if (ret instanceof Some) { @@ -183,6 +216,13 @@ public Object getValue(String name) { } } + public Object getLastObject() { + IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest"); + Object obj = r.lineRep().call("$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { PrintStream printStream = new PrintStream(out); @@ -198,7 +238,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { "restart Zeppelin/Interpreter" ); } - scala.tools.nsc.interpreter.Results.Result ret = intp.interpret(st); + scala.tools.nsc.interpreter.Results.Result ret = interpret(st); Code code = getResultCode(ret); try { @@ -245,17 +285,21 @@ public int getProgress(InterpreterContext context) { @Override public List completion(String buf, int cursor) { - ScalaCompleter c = completor.completer(); - Candidates ret = c.complete(buf, cursor); + if (Utils.isScala2_10()) { + ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completor, "completer"); + Candidates ret = c.complete(buf, cursor); - List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); - List completions = new LinkedList(); + List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); + List completions = new LinkedList(); - for (String candidate : candidates) { - completions.add(new InterpreterCompletion(candidate, candidate)); - } + for (String candidate : candidates) { + completions.add(new InterpreterCompletion(candidate, candidate)); + } - return completions; + return completions; + } else { + return new LinkedList(); + } } private List currentClassPath() { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 43462ad8880..98fb8346ee0 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -48,8 +48,6 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 53bfc02b370..0407e6e1133 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -19,10 +19,15 @@ import java.io.File; import java.io.PrintWriter; -import java.lang.reflect.*; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; @@ -33,10 +38,9 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; -import org.apache.spark.repl.SparkCommandLine; + +import org.apache.spark.SecurityManager; import org.apache.spark.repl.SparkILoop; -import org.apache.spark.repl.SparkIMain; -import org.apache.spark.repl.SparkJLineCompletion; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; @@ -45,7 +49,6 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterUtils; @@ -72,9 +75,12 @@ import scala.collection.mutable.HashMap; import scala.collection.mutable.HashSet; import scala.reflect.io.AbstractFile; +import scala.tools.nsc.Global; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.Completion.Candidates; import scala.tools.nsc.interpreter.Completion.ScalaCompleter; +import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results; import scala.tools.nsc.settings.MutableSettings; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -88,7 +94,11 @@ public class SparkInterpreter extends Interpreter { private ZeppelinContext z; private SparkILoop interpreter; - private SparkIMain intp; + /** + * intp - org.apache.spark.repl.SparkIMain (scala 2.10) + * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) + */ + private Object intp; private static SparkContext sc; private static SQLContext sqlc; private static SparkEnv env; @@ -99,10 +109,16 @@ public class SparkInterpreter extends Interpreter { private SparkOutputStream out; private SparkDependencyResolver dep; - private SparkJLineCompletion completor; + + /** + * completor - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) + */ + private Object completor; private Map binder; private SparkVersion sparkVersion; + private File outputDir; // class outputdir for scala 2.11 + private HttpServer classServer; // classserver for scala 2.11 public SparkInterpreter(Properties property) { @@ -209,12 +225,15 @@ public SQLContext getSQLContext() { } } + public SparkDependencyResolver getDependencyResolver() { if (dep == null) { - dep = new SparkDependencyResolver(intp, - sc, - getProperty("zeppelin.dep.localrepo"), - getProperty("zeppelin.dep.additionalRemoteRepository")); + dep = new SparkDependencyResolver( + (Global) Utils.invokeMethod(intp, "global"), + (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"), + sc, + getProperty("zeppelin.dep.localrepo"), + getProperty("zeppelin.dep.additionalRemoteRepository")); } return dep; } @@ -235,13 +254,20 @@ public SparkContext createSparkContext() { logger.info("------ Create new SparkContext {} -------", getProperty("master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); - String[] jars = SparkILoop.getAddedJars(); + String[] jars = null; + + if (Utils.isScala2_10()) { + jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars"); + } else { + jars = (String[]) Utils.invokeStaticMethod( + findClass("org.apache.spark.repl.Main"), "getAddedJars"); + } String classServerUri = null; try { // in case of spark 1.1x, spark 1.2x - Method classServer = interpreter.intp().getClass().getMethod("classServer"); - HttpServer httpServer = (HttpServer) classServer.invoke(interpreter.intp()); + Method classServer = intp.getClass().getMethod("classServer"); + HttpServer httpServer = (HttpServer) classServer.invoke(intp); classServerUri = httpServer.uri(); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -250,8 +276,8 @@ public SparkContext createSparkContext() { if (classServerUri == null) { try { // for spark 1.3x - Method classServer = interpreter.intp().getClass().getMethod("classServerUri"); - classServerUri = (String) classServer.invoke(interpreter.intp()); + Method classServer = intp.getClass().getMethod("classServerUri"); + classServerUri = (String) classServer.invoke(intp); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { // continue instead of: throw new InterpreterException(e); @@ -261,6 +287,13 @@ public SparkContext createSparkContext() { } } + + if (Utils.isScala2_11()) { + classServer = createHttpServer(outputDir); + classServer.start(); + classServerUri = classServer.uri(); + } + SparkConf conf = new SparkConf() .setMaster(getProperty("master")) @@ -392,17 +425,49 @@ public void open() { * getClass.getClassLoader >> } >> in.setContextClassLoader() */ Settings settings = new Settings(); - if (getProperty("args") != null) { - String[] argsArray = getProperty("args").split(" "); - LinkedList argList = new LinkedList(); - for (String arg : argsArray) { - argList.add(arg); + + // process args + String args = getProperty("args"); + if (args == null) { + args = ""; + } + + String[] argsArray = args.split(" "); + LinkedList argList = new LinkedList(); + for (String arg : argsArray) { + argList.add(arg); + } + + if (Utils.isScala2_10()) { + scala.collection.immutable.List list = + JavaConversions.asScalaBuffer(argList).toList(); + + Object sparkCommandLine = Utils.instantiateClass( + "org.apache.spark.repl.SparkCommandLine", + new Class[]{ scala.collection.immutable.List.class }, + new Object[]{ list }); + + settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings"); + } else { + String sparkReplClassDir = getProperty("spark.repl.classdir"); + if (sparkReplClassDir == null) { + sparkReplClassDir = System.getProperty("spark.repl.classdir"); + } + if (sparkReplClassDir == null) { + sparkReplClassDir = System.getProperty("java.io.tmpdir"); } - SparkCommandLine command = - new SparkCommandLine(scala.collection.JavaConversions.asScalaBuffer( - argList).toList()); - settings = command.settings(); + outputDir = createTempDir(sparkReplClassDir); + + argList.add("-Yrepl-class-based"); + argList.add("-Yrepl-outdir"); + argList.add(outputDir.getAbsolutePath()); + + + scala.collection.immutable.List list = + JavaConversions.asScalaBuffer(argList).toList(); + + settings.processArguments(list, true); } // set classpath for scala compiler @@ -481,36 +546,41 @@ public void open() { synchronized (sharedInterpreterLock) { /* create scala repl */ if (printREPLOutput()) { - this.interpreter = new SparkILoop(null, new PrintWriter(out)); + this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); } else { - this.interpreter = new SparkILoop(null, new PrintWriter(Console.out(), false)); + this.interpreter = new SparkILoop((java.io.BufferedReader) null, + new PrintWriter(Console.out(), false)); } interpreter.settings_$eq(settings); interpreter.createInterpreter(); - intp = interpreter.intp(); - intp.setContextClassLoader(); - intp.initializeSynchronous(); - - if (classOutputDir == null) { - classOutputDir = settings.outputDirs().getSingleOutput().get(); - } else { - // change SparkIMain class output dir - settings.outputDirs().setSingleOutput(classOutputDir); - ClassLoader cl = intp.classLoader(); + intp = Utils.invokeMethod(interpreter, "intp"); + Utils.invokeMethod(intp, "setContextClassLoader"); + Utils.invokeMethod(intp, "initializeSynchronous"); - try { - Field rootField = cl.getClass().getSuperclass().getDeclaredField("root"); - rootField.setAccessible(true); - rootField.set(cl, classOutputDir); - } catch (NoSuchFieldException | IllegalAccessException e) { - logger.error(e.getMessage(), e); + if (Utils.isScala2_10()) { + if (classOutputDir == null) { + classOutputDir = settings.outputDirs().getSingleOutput().get(); + } else { + // change SparkIMain class output dir + settings.outputDirs().setSingleOutput(classOutputDir); + ClassLoader cl = (ClassLoader) Utils.invokeMethod(intp, "classLoader"); + try { + Field rootField = cl.getClass().getSuperclass().getDeclaredField("root"); + rootField.setAccessible(true); + rootField.set(cl, classOutputDir); + } catch (NoSuchFieldException | IllegalAccessException e) { + logger.error(e.getMessage(), e); + } } - } - completor = new SparkJLineCompletion(intp); + completor = Utils.instantiateClass( + "SparkJLineCompletion", + new Class[]{findClass("org.apache.spark.repl.SparkIMain")}, + new Object[]{intp}); + } sc = getSparkContext(); if (sc.getPoolForName("fair").isEmpty()) { @@ -530,31 +600,34 @@ public void open() { z = new ZeppelinContext(sc, sqlc, null, dep, Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); + interpret("@transient val _binder = new java.util.HashMap[String, Object]()"); + Map binder; + if (Utils.isScala2_10()) { + binder = (Map) getValue("_binder"); + } else { + binder = (Map) getLastObject(); + } binder.put("sc", sc); binder.put("sqlc", sqlc); binder.put("z", z); - binder.put("intp", intp); - intp.interpret("@transient val intp = _binder.get(\"intp\").asInstanceOf[org.apache.spark" + - ".repl.SparkIMain]"); - intp.interpret("@transient val z = " + + interpret("@transient val z = " + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); - intp.interpret("@transient val sc = " + interpret("@transient val sc = " + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); - intp.interpret("@transient val sqlc = " + interpret("@transient val sqlc = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("@transient val sqlContext = " + interpret("@transient val sqlContext = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("import org.apache.spark.SparkContext._"); + interpret("import org.apache.spark.SparkContext._"); if (importImplicit()) { if (sparkVersion.oldSqlContextImplicits()) { - intp.interpret("import sqlContext._"); + interpret("import sqlContext._"); } else { - intp.interpret("import sqlContext.implicits._"); - intp.interpret("import sqlContext.sql"); - intp.interpret("import org.apache.spark.sql.functions._"); + interpret("import sqlContext.implicits._"); + interpret("import sqlContext.sql"); + interpret("import org.apache.spark.sql.functions._"); } } } @@ -570,18 +643,20 @@ public void open() { Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")"); */ - try { - if (sparkVersion.oldLoadFilesMethodName()) { - Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); - loadFiles.invoke(this.interpreter, settings); - } else { - Method loadFiles = this.interpreter.getClass().getMethod( - "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); - loadFiles.invoke(this.interpreter, settings); + if (Utils.isScala2_10()) { + try { + if (sparkVersion.oldLoadFilesMethodName()) { + Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } else { + Method loadFiles = this.interpreter.getClass().getMethod( + "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); } // add jar from DepInterpreter @@ -625,6 +700,14 @@ public void open() { numReferenceOfSparkContext.incrementAndGet(); } + private Results.Result interpret(String line) { + return (Results.Result) Utils.invokeMethod( + intp, + "interpret", + new Class[] {String.class}, + new Object[] {line}); + } + private List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); @@ -664,17 +747,22 @@ public List completion(String buf, int cursor) { completionText = ""; cursor = completionText.length(); } - ScalaCompleter c = completor.completer(); - Candidates ret = c.complete(completionText, cursor); + if (Utils.isScala2_10()) { + ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completor, "completor"); + Candidates ret = c.complete(completionText, cursor); - List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); - List completions = new LinkedList(); + List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); + List completions = new LinkedList(); - for (String candidate : candidates) { - completions.add(new InterpreterCompletion(candidate, candidate)); + for (String candidate : candidates) { + completions.add(new InterpreterCompletion(candidate, candidate)); + } + + return completions; + } else { + return new LinkedList(); } - return completions; } private String getCompletionTargetString(String text, int cursor) { @@ -718,9 +806,15 @@ private String getCompletionTargetString(String text, int cursor) { return resultCompletionText; } + /* + * this method doesn't work in scala 2.11 + * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option + */ public Object getValue(String name) { - Object ret = intp.valueOfTerm(name); - if (ret instanceof None) { + Object ret = Utils.invokeMethod( + intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); + + if (ret instanceof None || ret instanceof scala.None$) { return null; } else if (ret instanceof Some) { return ((Some) ret).get(); @@ -729,6 +823,13 @@ public Object getValue(String name) { } } + public Object getLastObject() { + IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest"); + Object obj = r.lineRep().call("$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; + } + String getJobGroup(InterpreterContext context){ return "zeppelin-" + context.getParagraphId(); } @@ -808,7 +909,7 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte scala.tools.nsc.interpreter.Results.Result res = null; try { - res = intp.interpret(incomplete + s); + res = interpret(incomplete + s); } catch (Exception e) { sc.clearJobGroup(); out.setInterpreterOutput(null); @@ -831,8 +932,8 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte // make sure code does not finish with comment if (r == Code.INCOMPLETE) { - scala.tools.nsc.interpreter.Results.Result res; - res = intp.interpret(incomplete + "\nprint(\"\")"); + scala.tools.nsc.interpreter.Results.Result res = null; + res = interpret(incomplete + "\nprint(\"\")"); r = getResultCode(res); } @@ -849,7 +950,7 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte } private void putLatestVarInResourcePool(InterpreterContext context) { - String varName = intp.mostRecentVar(); + String varName = (String) Utils.invokeMethod(intp, "mostRecentVar"); if (varName == null || varName.isEmpty()) { return; } @@ -998,9 +1099,13 @@ public void close() { if (numReferenceOfSparkContext.decrementAndGet() == 0) { sc.stop(); sc = null; + if (classServer != null) { + classServer.stop(); + classServer = null; + } } - intp.close(); + Utils.invokeMethod(intp, "close"); } @Override @@ -1025,4 +1130,67 @@ public ZeppelinContext getZeppelinContext() { public SparkVersion getSparkVersion() { return sparkVersion; } + + + + private Class findClass(String name) { + try { + return this.getClass().forName(name); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + private File createTempDir(String dir) { + File file = null; + + // try Utils.createTempDir() + file = (File) Utils.invokeStaticMethod( + Utils.findClass("org.apache.spark.util.Utils"), + "createTempDir", + new Class[]{String.class, String.class}, + new Object[]{dir, "spark"}); + + // fallback to old method + if (file == null) { + file = (File) Utils.invokeStaticMethod( + Utils.findClass("org.apache.spark.util.Utils"), + "createTempDir", + new Class[]{String.class}, + new Object[]{dir}); + } + + return file; + } + + private HttpServer createHttpServer(File outputDir) { + SparkConf conf = new SparkConf(); + try { + // try to create HttpServer + Constructor constructor = getClass().getClassLoader() + .loadClass(HttpServer.class.getName()) + .getConstructor(new Class[]{ + SparkConf.class, File.class, SecurityManager.class, int.class, String.class}); + + return (HttpServer) constructor.newInstance(new Object[] { + conf, outputDir, new SecurityManager(conf), 0, "HTTP Server"}); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + // fallback to old constructor + Constructor constructor = null; + try { + constructor = getClass().getClassLoader() + .loadClass(HttpServer.class.getName()) + .getConstructor(new Class[]{ + File.class, SecurityManager.class, int.class, String.class}); + return (HttpServer) constructor.newInstance(new Object[] { + outputDir, new SecurityManager(conf), 0, "HTTP Server"}); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e1) { + logger.error(e1.getMessage(), e1); + return null; + } + } + } } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index a3636a29c1b..fc8923c4172 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -27,9 +27,7 @@ import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 2fa716b449e..17f2de7be24 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -32,10 +32,12 @@ public class SparkVersion { public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0"); public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0"); public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0"); - public static final SparkVersion SPARK_1_7_0 = SparkVersion.fromVersionString("1.7.0"); + + public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0"); + public static final SparkVersion SPARK_2_1_0 = SparkVersion.fromVersionString("2.1.0"); public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0; - public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_7_0; + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_2_1_0; private int version; private String versionString; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java new file mode 100644 index 00000000000..940e202473d --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Utility and helper functions for the Spark Interpreter + */ +class Utils { + public static Logger logger = LoggerFactory.getLogger(Utils.class); + + static Object invokeMethod(Object o, String name) { + return invokeMethod(o, name, new Class[]{}, new Object[]{}); + } + + static Object invokeMethod(Object o, String name, Class[] argTypes, Object[] params) { + try { + return o.getClass().getMethod(name, argTypes).invoke(o, params); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + static Object invokeStaticMethod(Class c, String name, Class[] argTypes, Object[] params) { + try { + return c.getMethod(name, argTypes).invoke(null, params); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + static Object invokeStaticMethod(Class c, String name) { + return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); + } + + static Class findClass(String name) { + try { + return Utils.class.forName(name); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + static Object instantiateClass(String name, Class[] argTypes, Object[] params) { + try { + Constructor constructor = Utils.class.getClassLoader() + .loadClass(name).getConstructor(argTypes); + return constructor.newInstance(params); + } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + // function works after intp is initialized + static boolean isScala2_10() { + try { + Utils.class.forName("org.apache.spark.repl.SparkIMain"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + static boolean isScala2_11() { + return !isScala2_10(); + } +} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java index e4881d373be..c4047977861 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java @@ -29,7 +29,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkContext; -import org.apache.spark.repl.SparkIMain; import org.apache.zeppelin.dep.AbstractDependencyResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +59,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); private Global global; - private SparkIMain intp; + private ClassLoader runtimeClassLoader; private SparkContext sc; private final String[] exclusions = new String[] {"org.scala-lang:scala-library", @@ -71,11 +70,14 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { "org.apache.zeppelin:zeppelin-spark", "org.apache.zeppelin:zeppelin-server"}; - public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath, - String additionalRemoteRepository) { + public SparkDependencyResolver(Global global, + ClassLoader runtimeClassLoader, + SparkContext sc, + String localRepoPath, + String additionalRemoteRepository) { super(localRepoPath); - this.intp = intp; - this.global = intp.global(); + this.global = global; + this.runtimeClassLoader = runtimeClassLoader; this.sc = sc; addRepoFromProperty(additionalRemoteRepository); } @@ -127,24 +129,22 @@ private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException, private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException { - ClassLoader cl = intp.classLoader().getParent(); Method addURL; - addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); + addURL = runtimeClassLoader.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); addURL.setAccessible(true); for (URL url : urls) { - addURL.invoke(cl, url); + addURL.invoke(runtimeClassLoader, url); } } private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException { - ClassLoader cl = intp.classLoader().getParent(); Method addURL; - addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); + addURL = runtimeClassLoader.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); addURL.setAccessible(true); for (URL url : urls) { - addURL.invoke(cl, url); + addURL.invoke(runtimeClassLoader, url); } } @@ -209,7 +209,7 @@ public List load(String artifact, Collection excludes, private void loadFromFs(String artifact, boolean addSparkContext) throws Exception { File jarFile = new File(artifact); - intp.global().new Run(); + global.new Run(); if (sc.version().startsWith("1.1")) { updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()}); @@ -257,7 +257,7 @@ private List loadFromMvn(String artifact, Collection excludes, + artifactResult.getArtifact().getVersion()); } - intp.global().new Run(); + global.new Run(); if (sc.version().startsWith("1.1")) { updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0])); } else { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 90f311ed5c3..815e77f9bd9 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -19,15 +19,15 @@ import static org.junit.Assert.*; +import java.io.BufferedReader; import java.io.File; import java.util.HashMap; import java.util.LinkedList; import java.util.Properties; -import org.apache.spark.HttpServer; -import org.apache.spark.SecurityManager; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.repl.SparkILoop; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; @@ -41,6 +41,7 @@ import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.tools.nsc.interpreter.IMain; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SparkInterpreterTest { @@ -139,6 +140,7 @@ public void testBasicIntp() { assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); assertTrue(incomplete.message().length() > 0); // expecting some error // message + /* * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b")); * repl.interpret("val ver = sc.version"); @@ -182,15 +184,15 @@ public void testSparkSql(){ if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default. - // create new interpreter - Properties p = new Properties(); - SparkInterpreter repl2 = new SparkInterpreter(p); - repl2.open(); - - repl.interpret("case class Man(name:String, age:Int)", context); - repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); - assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); - repl2.getSparkContext().stop(); + // create new interpreter + Properties p = new Properties(); + SparkInterpreter repl2 = new SparkInterpreter(p); + repl2.open(); + + repl.interpret("case class Man(name:String, age:Int)", context); + repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); + assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); + repl2.getSparkContext().stop(); } } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java index a0271f4471d..b226a001d24 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; -import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.junit.Test; public class SparkDependencyResolverTest { diff --git a/zeppelin-display/pom.xml b/zeppelin-display/pom.xml index 4bf29e65e3a..5cec0d121f2 100644 --- a/zeppelin-display/pom.xml +++ b/zeppelin-display/pom.xml @@ -33,11 +33,6 @@ Zeppelin: Display system apis http://zeppelin.apache.org - - 2.10.4 - 2.10 - - @@ -86,16 +81,34 @@ org.scala-lang scala-library + ${scala.version} org.scalatest - scalatest_2.10 - 2.1.1 + scalatest_${scala.binary.version} + ${scalatest.version} test + + + scala-2.11 + + scala-2.11 + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + diff --git a/zeppelin-distribution/pom.xml b/zeppelin-distribution/pom.xml index 65fc0cbbaa6..f65044e6faf 100644 --- a/zeppelin-distribution/pom.xml +++ b/zeppelin-distribution/pom.xml @@ -45,6 +45,34 @@ + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + org.scala-lang + scalap + ${scala.version} + + + + zeppelin-server @@ -84,6 +112,23 @@ + + scala-2.11 + + scala-2.11 + + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + publish-distr diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index fd5c31cd7f5..97b4c66268b 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -43,19 +43,25 @@ org.scala-lang scala-library - 2.10.4 + ${scala.version} org.scala-lang scala-compiler - 2.10.4 + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} org.scala-lang scalap - 2.10.4 + ${scala.version} @@ -221,6 +227,19 @@ org.scala-lang scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} @@ -258,8 +277,8 @@ org.scalatest - scalatest_2.10 - 2.1.1 + scalatest_${scala.binary.version} + ${scalatest.version} test @@ -393,6 +412,23 @@ + + scala-2.11 + + scala-2.11 + + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + using-source-tree diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index f2d4c99ca49..cb77ae745be 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -102,6 +102,30 @@ protected static void startUp() throws Exception { System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist"); LOG.info("Staring test Zeppelin up..."); + + + // exclude org.apache.zeppelin.rinterpreter.* for scala 2.11 test + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + String interpreters = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS); + String interpretersCompatibleWithScala211Test = null; + + for (String intp : interpreters.split(",")) { + if (intp.startsWith("org.apache.zeppelin.rinterpreter")) { + continue; + } + + if (interpretersCompatibleWithScala211Test == null) { + interpretersCompatibleWithScala211Test = intp; + } else { + interpretersCompatibleWithScala211Test += "," + intp; + } + } + + System.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), + interpretersCompatibleWithScala211Test); + + executor = Executors.newSingleThreadExecutor(); executor.submit(server); long s = System.currentTimeMillis(); @@ -241,6 +265,8 @@ protected static void shutDown() throws Exception { } LOG.info("Test Zeppelin terminated."); + + System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName()); } }