diff --git a/.travis.yml b/.travis.yml index eec3b7cb2c1..89e15fabc7a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,9 +33,17 @@ addons: matrix: include: - # Test all modules + # Test all modules with spark-2.0.0-preview and scala 2.11 - jdk: "oraclejdk7" - env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" + env: SPARK_VER="2.0.0-preview" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Dspark.version=2.0.0-preview -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Dscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" + + # Test all modules with scala 2.10 + - jdk: "oraclejdk7" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Dscala-2.10" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" + + # Test all modules with scala 2.11 + - jdk: "oraclejdk7" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Dscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/README.md b/README.md index eee427cee3a..5feab28399b 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,14 @@ And browse [localhost:8080](localhost:8080) in your browser. For configuration details check __`./conf`__ subdirectory. +### Building for Scala 2.11 + +To produce a Zeppelin package compiled with Scala 2.11, use the -Dscala-2.11 property: + +``` +mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark -Dscala-2.11 -DskipTests clean install +``` + ### Package To package the final distribution including the compressed archive, run: diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 5ca86709791..dc071e3f86a 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -38,14 +38,11 @@ 3.0.1 1.0.5.4 1.3.0 - 2.10.4 - 2.10 3.3.2 1.7.1 16.0.1 - 2.2.4 4.12 3.2.4-Zeppelin 1.7.0 @@ -173,6 +170,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile diff --git a/flink/pom.xml b/flink/pom.xml index 226dc12a100..7355141fa66 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -37,8 +37,6 @@ 1.0.3 2.3.7 - 2.10 - 2.10.4 2.0.1 @@ -73,68 +71,71 @@ org.apache.flink - flink-clients_${flink.scala.binary.version} + flink-clients_${scala.binary.version} ${flink.version} org.apache.flink - flink-runtime_${flink.scala.binary.version} + flink-runtime_${scala.binary.version} ${flink.version} org.apache.flink - flink-scala_${flink.scala.binary.version} + flink-scala_${scala.binary.version} ${flink.version} org.apache.flink - flink-scala-shell_${flink.scala.binary.version} + flink-scala-shell_${scala.binary.version} ${flink.version} com.typesafe.akka - akka-actor_${flink.scala.binary.version} + akka-actor_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-remote_${flink.scala.binary.version} + akka-remote_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-slf4j_${flink.scala.binary.version} + akka-slf4j_${scala.binary.version} ${flink.akka.version} com.typesafe.akka - akka-testkit_${flink.scala.binary.version} + akka-testkit_${scala.binary.version} ${flink.akka.version} org.scala-lang scala-library - ${flink.scala.version} + ${scala.version} + provided org.scala-lang scala-compiler - ${flink.scala.version} + ${scala.version} + provided org.scala-lang scala-reflect - ${flink.scala.version} + ${scala.version} + provided @@ -169,7 +170,7 @@ net.alchim31.maven scala-maven-plugin - 3.1.4 + 3.2.2 @@ -199,7 +200,7 @@ org.scalamacros - paradise_${flink.scala.version} + paradise_${scala.version} ${scala.macros.version} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 68591d79754..d3229cf09a8 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,6 +17,7 @@ */ package org.apache.zeppelin.flink; +import java.lang.reflect.InvocationTargetException; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; @@ -24,10 +25,7 @@ import java.io.PrintWriter; import java.net.URL; import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; @@ -45,6 +43,8 @@ import scala.Console; import scala.None; import scala.Some; +import scala.collection.JavaConversions; +import scala.collection.immutable.Nil; import scala.runtime.AbstractFunction0; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.IMain; @@ -94,7 +94,7 @@ public void open() { // prepare bindings imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); + Map binder = (Map) getLastObject(); // import libraries imain.interpret("import scala.tools.nsc.io._"); @@ -103,7 +103,10 @@ public void open() { imain.interpret("import org.apache.flink.api.scala._"); imain.interpret("import org.apache.flink.api.common.functions._"); - imain.bindValue("env", env); + + binder.put("env", env); + imain.interpret("val env = _binder.get(\"env\").asInstanceOf[" + + env.getClass().getName() + "]"); } private boolean localMode() { @@ -192,16 +195,11 @@ private List classPath(ClassLoader cl) { return paths; } - public Object getValue(String name) { - IMain imain = flinkIloop.intp(); - Object ret = imain.valueOfTerm(name); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); - } else { - return ret; - } + public Object getLastObject() { + Object obj = imain.lastRequest().lineRep().call( + "$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; } @Override diff --git a/ignite/pom.xml b/ignite/pom.xml index a3312a65953..66e6765aa7e 100644 --- a/ignite/pom.xml +++ b/ignite/pom.xml @@ -33,9 +33,7 @@ http://zeppelin.apache.org - 1.6.0 - 2.10 - 2.10.4 + 1.5.0.final @@ -73,19 +71,19 @@ org.scala-lang scala-library - ${ignite.scala.version} + ${scala.version} org.scala-lang scala-compiler - ${ignite.scala.version} + ${scala.version} org.scala-lang scala-reflect - ${ignite.scala.version} + ${scala.version} diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java index fa5a079ae19..3f240183567 100644 --- a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java @@ -44,6 +44,7 @@ import scala.Console; import scala.None; import scala.Some; +import scala.collection.JavaConversions; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.IMain; import scala.tools.nsc.interpreter.Results.Result; @@ -174,16 +175,11 @@ private List classPath(ClassLoader cl) { return paths; } - public Object getValue(String name) { - Object val = imain.valueOfTerm(name); - - if (val instanceof None) { - return null; - } else if (val instanceof Some) { - return ((Some) val).get(); - } else { - return val; - } + public Object getLastObject() { + Object obj = imain.lastRequest().lineRep().call( + "$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; } private Ignite getIgnite() { @@ -222,7 +218,7 @@ private Ignite getIgnite() { private void initIgnite() { imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map binder = (Map) getValue("_binder"); + Map binder = (Map) getLastObject(); if (getIgnite() != null) { binder.put("ignite", ignite); diff --git a/pom.xml b/pom.xml index 607fce91874..560df065b2c 100755 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -79,6 +79,11 @@ + 2.10.5 + 2.10 + 2.2.4 + 1.12.5 + 1.7.10 1.2.17 0.9.2 @@ -93,7 +98,6 @@ - org.slf4j slf4j-api @@ -136,7 +140,6 @@ 2.5 - com.google.code.gson gson @@ -155,14 +158,12 @@ 1.5 - commons-io commons-io 2.4 - commons-collections commons-collections @@ -181,7 +182,6 @@ ${guava.version} - junit junit @@ -412,7 +412,7 @@ .github/* .gitignore .repository/ - .Rhistory + .Rhistory **/*.diff **/*.patch **/*.avsc @@ -635,6 +635,30 @@ + + scala-2.10 + + !scala-2.11 + + + 2.10.5 + 2.10 + + + + + + scala-2.11 + + scala-2.11 + + + 2.11.7 + 2.11 + + + vendor-repo @@ -710,7 +734,6 @@ false - diff --git a/r/pom.xml b/r/pom.xml index e6c0403381d..b14424b0d21 100644 --- a/r/pom.xml +++ b/r/pom.xml @@ -36,8 +36,6 @@ .sh / 1.4.1 - 2.10.4 - 2.10 @@ -118,13 +116,13 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.4 + ${scalatest.version} test org.scalacheck scalacheck_${scala.binary.version} - 1.12.5 + ${scalacheck.version} test @@ -376,4 +374,31 @@ + + + + + scala-2.10 + + !scala-2.11 + + + 1.6.1 + src/main/scala-2.10 + src/test/scala-2.10 + + + + + scala-2.11 + + scala-2.11 + + + 1.6.1 + src/main/scala-2.11 + src/test/scala/scala-2.11 + + + diff --git a/scalding/pom.xml b/scalding/pom.xml index e287f7b5c39..c561732551d 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -34,7 +34,6 @@ http://zeppelin.apache.org - 2.11.8 2.6.0 0.16.1-RC1 @@ -74,43 +73,43 @@ com.twitter - scalding-core_2.11 + scalding-core_${scala.binary.version} ${scalding.version} com.twitter - scalding-args_2.11 + scalding-args_${scala.binary.version} ${scalding.version} com.twitter - scalding-date_2.11 + scalding-date_${scala.binary.version} ${scalding.version} com.twitter - scalding-commons_2.11 + scalding-commons_${scala.binary.version} ${scalding.version} com.twitter - scalding-avro_2.11 + scalding-avro_${scala.binary.version} ${scalding.version} com.twitter - scalding-parquet_2.11 + scalding-parquet_${scala.binary.version} ${scalding.version} com.twitter - scalding-repl_2.11 + scalding-repl_${scala.binary.version} ${scalding.version} @@ -199,6 +198,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml index cb101b64df1..dc5d7f5edfe 100644 --- a/spark-dependencies/pom.xml +++ b/spark-dependencies/pom.xml @@ -37,8 +37,6 @@ 1.4.1 - 2.10.4 - 2.10 2.3.0 ${hadoop.version} @@ -285,11 +283,11 @@ ${spark.version} - + org.apache.spark @@ -346,6 +344,14 @@ + + scala-2.11 + + 1.6.1 + http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + + + spark-1.1 @@ -511,9 +517,6 @@ spark-1.6 - - true - 1.6.1 0.9 @@ -523,6 +526,19 @@ + + spark-2.0 + + true + + + 2.0.0 + 2.5.0 + 0.10.1 + 2.11.8 + + + hadoop-0.23 ${project.groupId} @@ -243,7 +241,7 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.4 + ${scalatest.version} test @@ -405,6 +403,7 @@ org.scala-tools maven-scala-plugin + 2.15.2 compile @@ -433,7 +432,6 @@ - sparkr diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java index 28c588551f4..72550d15c0d 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java @@ -21,21 +21,22 @@ import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; -import java.lang.reflect.Type; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import org.apache.spark.repl.SparkILoop; -import org.apache.spark.repl.SparkIMain; -import org.apache.spark.repl.SparkJLineCompletion; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.WrappedInterpreter; @@ -51,9 +52,12 @@ import scala.None; import scala.Some; import scala.collection.convert.WrapAsJava$; +import scala.collection.JavaConversions; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.Completion.Candidates; import scala.tools.nsc.interpreter.Completion.ScalaCompleter; +import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -64,10 +68,17 @@ * */ public class DepInterpreter extends Interpreter { - private SparkIMain intp; + /** + * intp - org.apache.spark.repl.SparkIMain (scala 2.10) + * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) + */ + private Object intp; private ByteArrayOutputStream out; private SparkDependencyContext depc; - private SparkJLineCompletion completor; + /** + * completor - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) + */ + private Object completor; private SparkILoop interpreter; static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class); @@ -103,7 +114,7 @@ public static String getSystemDefault( @Override public void close() { if (intp != null) { - intp.close(); + invokeMethod(intp, "close"); } } @@ -149,31 +160,52 @@ private void createIMain() { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - interpreter = new SparkILoop(null, new PrintWriter(out)); + interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); interpreter.settings_$eq(settings); interpreter.createInterpreter(); - intp = interpreter.intp(); - intp.setContextClassLoader(); - intp.initializeSynchronous(); + intp = invokeMethod(interpreter, "intp"); + + if (isScala2_10()) { + invokeMethod(intp, "setContextClassLoader"); + invokeMethod(intp, "initializeSynchronous"); + } depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"), getProperty("zeppelin.dep.additionalRemoteRepository")); - completor = new SparkJLineCompletion(intp); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - Map binder = (Map) getValue("_binder"); + if (isScala2_10()) { + completor = instantiateClass( + "org.apache.spark.repl.SparkJLineCompletion", + new Class[]{findClass("org.apache.spark.repl.SparkIMain")}, + new Object[]{intp}); + } + interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + Map binder; + if (isScala2_10()) { + binder = (Map) getValue("_binder"); + } else { + binder = (Map) getLastObject(); + } binder.put("depc", depc); - intp.interpret("@transient val z = " + interpret("@transient val z = " + "_binder.get(\"depc\")" + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]"); } + private Results.Result interpret(String line) { + return (Results.Result) invokeMethod( + intp, + "interpret", + new Class[] {String.class}, + new Object[] {line}); + } + public Object getValue(String name) { - Object ret = intp.valueOfTerm(name); + Object ret = invokeMethod(intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); if (ret instanceof None) { return null; } else if (ret instanceof Some) { @@ -183,6 +215,13 @@ public Object getValue(String name) { } } + public Object getLastObject() { + IMain.Request r = (IMain.Request) invokeMethod(intp, "lastRequest"); + Object obj = r.lineRep().call("$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { PrintStream printStream = new PrintStream(out); @@ -198,7 +237,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { "restart Zeppelin/Interpreter" ); } - scala.tools.nsc.interpreter.Results.Result ret = intp.interpret(st); + scala.tools.nsc.interpreter.Results.Result ret = interpret(st); Code code = getResultCode(ret); try { @@ -245,17 +284,21 @@ public int getProgress(InterpreterContext context) { @Override public List completion(String buf, int cursor) { - ScalaCompleter c = completor.completer(); - Candidates ret = c.complete(buf, cursor); + if (isScala2_10()) { + ScalaCompleter c = (ScalaCompleter) invokeMethod(completor, "completer"); + Candidates ret = c.complete(buf, cursor); - List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); - List completions = new LinkedList(); + List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); + List completions = new LinkedList(); - for (String candidate : candidates) { - completions.add(new InterpreterCompletion(candidate, candidate)); - } + for (String candidate : candidates) { + completions.add(new InterpreterCompletion(candidate, candidate)); + } - return completions; + return completions; + } else { + return new LinkedList(); + } } private List currentClassPath() { @@ -313,4 +356,84 @@ public Scheduler getScheduler() { return null; } } + + private Object invokeMethod(Object o, String name) { + return invokeMethod(o, name, new Class[]{}, new Object[]{}); + } + + private Object invokeMethod(Object o, String name, Class [] argTypes, Object [] params) { + try { + return o.getClass().getMethod(name, argTypes).invoke(o, params); + } catch (NoSuchMethodException e) { + logger.error(e.getMessage(), e); + } catch (InvocationTargetException e) { + logger.error(e.getMessage(), e); + } catch (IllegalAccessException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + + private Object invokeStaticMethod(Class c, String name) { + return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); + } + + private Object invokeStaticMethod(Class c, String name, Class [] argTypes, Object [] params) { + try { + return c.getMethod(name, argTypes).invoke(null, params); + } catch (NoSuchMethodException e) { + logger.error(e.getMessage(), e); + } catch (InvocationTargetException e) { + logger.error(e.getMessage(), e); + e.printStackTrace(); + } catch (IllegalAccessException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + + private Object instantiateClass(String name, Class [] argTypes, Object [] params) { + try { + Constructor constructor = getClass().getClassLoader() + .loadClass(name).getConstructor(argTypes); + return constructor.newInstance(params); + } catch (NoSuchMethodException e) { + logger.error(e.getMessage(), e); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + } catch (IllegalAccessException e) { + logger.error(e.getMessage(), e); + } catch (InstantiationException e) { + logger.error(e.getMessage(), e); + } catch (InvocationTargetException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + // function works after intp is initialized + private boolean isScala2_10() { + try { + this.getClass().forName("org.apache.spark.repl.SparkIMain"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + private boolean isScala2_11() { + return !isScala2_11(); + } + + + private Class findClass(String name) { + try { + return this.getClass().forName(name); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + } } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 43462ad8880..f63f3d4edf8 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -48,8 +48,6 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; @@ -530,6 +528,15 @@ public JavaSparkContext getJavaSparkContext() { } } + public Object getSparkSession() { + SparkInterpreter intp = getSparkInterpreter(); + if (intp == null) { + return null; + } else { + return intp.getSparkSession(); + } + } + public SparkConf getSparkConf() { JavaSparkContext sc = getJavaSparkContext(); if (sc == null) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 53bfc02b370..9fae8bc680c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -19,33 +19,34 @@ import java.io.File; import java.io.PrintWriter; -import java.lang.reflect.*; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; -import org.apache.spark.repl.SparkCommandLine; + +import org.apache.spark.SecurityManager; import org.apache.spark.repl.SparkILoop; -import org.apache.spark.repl.SparkIMain; -import org.apache.spark.repl.SparkJLineCompletion; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.jobs.JobProgressListener; +import org.apache.spark.util.Utils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterUtils; @@ -72,9 +73,12 @@ import scala.collection.mutable.HashMap; import scala.collection.mutable.HashSet; import scala.reflect.io.AbstractFile; +import scala.tools.nsc.Global; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.Completion.Candidates; import scala.tools.nsc.interpreter.Completion.ScalaCompleter; +import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results; import scala.tools.nsc.settings.MutableSettings; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -88,10 +92,16 @@ public class SparkInterpreter extends Interpreter { private ZeppelinContext z; private SparkILoop interpreter; - private SparkIMain intp; + /** + * intp - org.apache.spark.repl.SparkIMain (scala 2.10) + * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) + */ + private Object intp; + private SparkConf conf; private static SparkContext sc; private static SQLContext sqlc; private static SparkEnv env; + private static Object sparkSession; // spark 2.x private static JobProgressListener sparkListener; private static AbstractFile classOutputDir; private static Integer sharedInterpreterLock = new Integer(0); @@ -99,10 +109,16 @@ public class SparkInterpreter extends Interpreter { private SparkOutputStream out; private SparkDependencyResolver dep; - private SparkJLineCompletion completor; + + /** + * completor - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) + */ + private Object completor; private Map binder; private SparkVersion sparkVersion; + private File outputDir; // class outputdir for scala 2.11 + private Object classServer; // classserver for scala 2.11 on spark 1.x public SparkInterpreter(Properties property) { @@ -175,7 +191,32 @@ static JobProgressListener setupListeners(SparkContext context) { } private boolean useHiveContext() { - return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); + return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")) || + conf.get("spark.sql.catalogImplementation", "hive").toLowerCase().equals("hive"); + } + + /** + * See org.apache.spark.sql.SparkSession.hiveClassesArePresent + * @return + */ + private boolean hiveClassesArePresent() { + try { + this.getClass().forName("org.apache.spark.sql.hive.HiveSessionState"); + this.getClass().forName("org.apache.spark.sql.hive.HiveSharedState"); + this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf"); + return true; + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return false; + } + } + + public Object getSparkSession() { + synchronized (sharedInterpreterLock) { + if (sparkSession == null) { + createSparkSession(); + } + return sparkSession; + } } private boolean importImplicit() { @@ -183,38 +224,62 @@ private boolean importImplicit() { } public SQLContext getSQLContext() { - synchronized (sharedInterpreterLock) { + if (isSpark2()) { + return getSQLContext_2(); + } else { + return getSQLContext_1(); + } + } + + /** + * Get SQLContext for spark 2.x + */ + private SQLContext getSQLContext_2() { + if (sqlc == null) { + sqlc = (SQLContext) invokeMethod(sparkSession, "wrapped"); if (sqlc == null) { - if (useHiveContext()) { - String name = "org.apache.spark.sql.hive.HiveContext"; - Constructor hc; - try { - hc = getClass().getClassLoader().loadClass(name) - .getConstructor(SparkContext.class); - sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException | SecurityException - | ClassNotFoundException | InstantiationException - | IllegalAccessException | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Can't create HiveContext. Fallback to SQLContext", e); - // when hive dependency is not loaded, it'll fail. - // in this case SQLContext can be used. - sqlc = new SQLContext(getSparkContext()); - } - } else { + sqlc = (SQLContext) invokeMethod(sparkSession, "sqlContext"); + } + } + return sqlc; + } + + /** + * Get SQLContext for spark 1.x + */ + private SQLContext getSQLContext_1() { + if (sqlc == null) { + if (useHiveContext()) { + String name = "org.apache.spark.sql.hive.HiveContext"; + Constructor hc; + try { + hc = getClass().getClassLoader().loadClass(name) + .getConstructor(SparkContext.class); + sqlc = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warn("Can't create HiveContext. Fallback to SQLContext", e); + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. sqlc = new SQLContext(getSparkContext()); } + } else { + sqlc = new SQLContext(getSparkContext()); } - return sqlc; } + return sqlc; } public SparkDependencyResolver getDependencyResolver() { if (dep == null) { - dep = new SparkDependencyResolver(intp, - sc, - getProperty("zeppelin.dep.localrepo"), - getProperty("zeppelin.dep.additionalRemoteRepository")); + dep = new SparkDependencyResolver( + (Global) invokeMethod(intp, "global"), + (ClassLoader) invokeMethod(invokeMethod(intp, "classLoader"), "getParent"), + sc, + getProperty("zeppelin.dep.localrepo"), + getProperty("zeppelin.dep.additionalRemoteRepository")); } return dep; } @@ -231,18 +296,97 @@ private DepInterpreter getDepInterpreter() { return (DepInterpreter) p; } + /** + * Spark 2.x + * Create SparkSession + */ + public Object createSparkSession() { + logger.info("------ Create new SparkContext {} -------", getProperty("master")); + String execUri = System.getenv("SPARK_EXECUTOR_URI"); + conf.setAppName(getProperty("spark.app.name")); + + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()); + + if (execUri != null) { + conf.set("spark.executor.uri", execUri); + } + + if (System.getenv("SPARK_HOME") != null) { + conf.setSparkHome(System.getenv("SPARK_HOME")); + } + + conf.set("spark.scheduler.mode", "FAIR"); + conf.setMaster(getProperty("master")); + + Properties intpProperty = getProperty(); + + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String val = toString(intpProperty.get(key)); + if (!key.startsWith("spark.") || !val.trim().isEmpty()) { + logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val)); + conf.set(key, val); + } + } + + Class SparkSession = getClass("org.apache.spark.sql.SparkSession"); + Object builder = invokeStaticMethod(SparkSession, "builder"); + invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); + + if (useHiveContext()) { + if (hiveClassesArePresent()) { + invokeMethod(builder, "enableHiveSupport"); + sparkSession = invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session with Hive support"); + } else { + invokeMethod(builder, "config", + new Class[]{ String.class, String.class}, + new Object[]{ "spark.sql.catalogImplementation", "in-memory"}); + sparkSession = invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session with Hive support"); + } + } else { + sparkSession = invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session"); + } + + return sparkSession; + } + public SparkContext createSparkContext() { + if (isSpark2()) { + return createSparkContext_2(); + } else { + return createSparkContext_1(); + } + } + + /** + * Create SparkContext for spark 2.x + * @return + */ + private SparkContext createSparkContext_2() { + return (SparkContext) invokeMethod(sparkSession, "sparkContext"); + } + + private SparkContext createSparkContext_1() { logger.info("------ Create new SparkContext {} -------", getProperty("master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); - String[] jars = SparkILoop.getAddedJars(); + String[] jars = null; + + if (isScala2_10()) { + jars = (String[]) invokeStaticMethod(SparkILoop.class, "getAddedJars"); + } else { + jars = (String[]) invokeStaticMethod(findClass("org.apache.spark.repl.Main"), "getAddedJars"); + } String classServerUri = null; try { // in case of spark 1.1x, spark 1.2x - Method classServer = interpreter.intp().getClass().getMethod("classServer"); - HttpServer httpServer = (HttpServer) classServer.invoke(interpreter.intp()); - classServerUri = httpServer.uri(); + Method classServer = intp.getClass().getMethod("classServer"); + Object httpServer = classServer.invoke(intp); + classServerUri = (String) invokeMethod(httpServer, "uri"); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { // continue @@ -250,8 +394,8 @@ public SparkContext createSparkContext() { if (classServerUri == null) { try { // for spark 1.3x - Method classServer = interpreter.intp().getClass().getMethod("classServerUri"); - classServerUri = (String) classServer.invoke(interpreter.intp()); + Method classServer = intp.getClass().getMethod("classServerUri"); + classServerUri = (String) classServer.invoke(intp); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { // continue instead of: throw new InterpreterException(e); @@ -261,10 +405,15 @@ public SparkContext createSparkContext() { } } - SparkConf conf = - new SparkConf() - .setMaster(getProperty("master")) - .setAppName(getProperty("spark.app.name")); + + if (isScala2_11()) { + classServer = createHttpServer(outputDir); + invokeMethod(classServer, "start"); + classServerUri = (String) invokeMethod(classServer, "uri"); + } + + conf.setMaster(getProperty("master")) + .setAppName(getProperty("spark.app.name")); if (classServerUri != null) { conf.set("spark.repl.class.uri", classServerUri); @@ -376,6 +525,7 @@ public boolean printREPLOutput() { @Override public void open() { + conf = new SparkConf(); URL[] urls = getClassloaderUrls(); // Very nice discussion about how scala compiler handle classpath @@ -392,17 +542,49 @@ public void open() { * getClass.getClassLoader >> } >> in.setContextClassLoader() */ Settings settings = new Settings(); - if (getProperty("args") != null) { - String[] argsArray = getProperty("args").split(" "); - LinkedList argList = new LinkedList(); - for (String arg : argsArray) { - argList.add(arg); + + // process args + String args = getProperty("args"); + if (args == null) { + args = ""; + } + + String[] argsArray = args.split(" "); + LinkedList argList = new LinkedList(); + for (String arg : argsArray) { + argList.add(arg); + } + + if (isScala2_10()) { + scala.collection.immutable.List list = + JavaConversions.asScalaBuffer(argList).toList(); + + Object sparkCommandLine = instantiateClass( + "org.apache.spark.repl.SparkCommandLine", + new Class[]{ scala.collection.immutable.List.class }, + new Object[]{ list }); + + settings = (Settings) invokeMethod(sparkCommandLine, "settings"); + } else { + String sparkReplClassDir = getProperty("spark.repl.classdir"); + if (sparkReplClassDir == null) { + sparkReplClassDir = System.getProperty("spark.repl.classdir"); + } + if (sparkReplClassDir == null) { + sparkReplClassDir = System.getProperty("java.io.tmpdir"); } - SparkCommandLine command = - new SparkCommandLine(scala.collection.JavaConversions.asScalaBuffer( - argList).toList()); - settings = command.settings(); + outputDir = createTempDir(sparkReplClassDir); + + argList.add("-Yrepl-class-based"); + argList.add("-Yrepl-outdir"); + argList.add(outputDir.getAbsolutePath()); + + + scala.collection.immutable.List list = + JavaConversions.asScalaBuffer(argList).toList(); + + settings.processArguments(list, true); } // set classpath for scala compiler @@ -470,7 +652,19 @@ public void open() { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$"); + /* Required for scoped mode. + * In scoped mode multiple scala compiler (repl) generates class in the same directory. + * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' + * Therefore it's possible to generated class conflict(overwrite) with other repl generated + * class. + * + * To prevent generated class name conflict, + * change prefix of generated class name from each scala compiler (repl) instance. + * + * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern + * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ + */ + System.setProperty("scala.repl.name.line", "$line" + this.hashCode()); // To prevent 'File name too long' error on some file system. MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName(); @@ -481,37 +675,45 @@ public void open() { synchronized (sharedInterpreterLock) { /* create scala repl */ if (printREPLOutput()) { - this.interpreter = new SparkILoop(null, new PrintWriter(out)); + this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out)); } else { - this.interpreter = new SparkILoop(null, new PrintWriter(Console.out(), false)); + this.interpreter = new SparkILoop((java.io.BufferedReader) null, + new PrintWriter(Console.out(), false)); } interpreter.settings_$eq(settings); interpreter.createInterpreter(); - intp = interpreter.intp(); - intp.setContextClassLoader(); - intp.initializeSynchronous(); - - if (classOutputDir == null) { - classOutputDir = settings.outputDirs().getSingleOutput().get(); - } else { - // change SparkIMain class output dir - settings.outputDirs().setSingleOutput(classOutputDir); - ClassLoader cl = intp.classLoader(); + intp = invokeMethod(interpreter, "intp"); + invokeMethod(intp, "setContextClassLoader"); + invokeMethod(intp, "initializeSynchronous"); - try { - Field rootField = cl.getClass().getSuperclass().getDeclaredField("root"); - rootField.setAccessible(true); - rootField.set(cl, classOutputDir); - } catch (NoSuchFieldException | IllegalAccessException e) { - logger.error(e.getMessage(), e); + if (isScala2_10()) { + if (classOutputDir == null) { + classOutputDir = settings.outputDirs().getSingleOutput().get(); + } else { + // change SparkIMain class output dir + settings.outputDirs().setSingleOutput(classOutputDir); + ClassLoader cl = (ClassLoader) invokeMethod(intp, "classLoader"); + try { + Field rootField = cl.getClass().getSuperclass().getDeclaredField("root"); + rootField.setAccessible(true); + rootField.set(cl, classOutputDir); + } catch (NoSuchFieldException | IllegalAccessException e) { + logger.error(e.getMessage(), e); + } } - } - completor = new SparkJLineCompletion(intp); + completor = instantiateClass( + "org.apache.spark.repl.SparkJLineCompletion", + new Class[]{findClass("org.apache.spark.repl.SparkIMain")}, + new Object[]{intp}); + } + if (isSpark2()) { + sparkSession = getSparkSession(); + } sc = getSparkContext(); if (sc.getPoolForName("fair").isEmpty()) { Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR(); @@ -530,31 +732,50 @@ public void open() { z = new ZeppelinContext(sc, sqlc, null, dep, Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); + interpret("@transient val _binder = new java.util.HashMap[String, Object]()"); + Map binder; + if (isScala2_10()) { + binder = (Map) getValue("_binder"); + } else { + binder = (Map) getLastObject(); + } binder.put("sc", sc); binder.put("sqlc", sqlc); binder.put("z", z); - binder.put("intp", intp); - intp.interpret("@transient val intp = _binder.get(\"intp\").asInstanceOf[org.apache.spark" + - ".repl.SparkIMain]"); - intp.interpret("@transient val z = " + + if (isSpark2()) { + binder.put("spark", sparkSession); + } + + interpret("@transient val z = " + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); - intp.interpret("@transient val sc = " + interpret("@transient val sc = " + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); - intp.interpret("@transient val sqlc = " + interpret("@transient val sqlc = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("@transient val sqlContext = " + interpret("@transient val sqlContext = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("import org.apache.spark.SparkContext._"); + + if (isSpark2()) { + interpret("@transient val spark = " + + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]"); + } + + interpret("import org.apache.spark.SparkContext._"); if (importImplicit()) { - if (sparkVersion.oldSqlContextImplicits()) { - intp.interpret("import sqlContext._"); + if (isSpark2()) { + interpret("import spark.implicits._"); + interpret("import spark.sql"); + interpret("import org.apache.spark.sql.functions._"); } else { - intp.interpret("import sqlContext.implicits._"); - intp.interpret("import sqlContext.sql"); - intp.interpret("import org.apache.spark.sql.functions._"); + if (sparkVersion.oldSqlContextImplicits()) { + interpret("import sqlContext._"); + } else { + interpret("import sqlContext.implicits._"); + interpret("import sqlContext.sql"); + interpret("import org.apache.spark.sql.functions._"); + } } } } @@ -570,18 +791,20 @@ public void open() { Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")"); */ - try { - if (sparkVersion.oldLoadFilesMethodName()) { - Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); - loadFiles.invoke(this.interpreter, settings); - } else { - Method loadFiles = this.interpreter.getClass().getMethod( - "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); - loadFiles.invoke(this.interpreter, settings); + if (isScala2_10()) { + try { + if (sparkVersion.oldLoadFilesMethodName()) { + Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } else { + Method loadFiles = this.interpreter.getClass().getMethod( + "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); } // add jar from DepInterpreter @@ -625,6 +848,14 @@ public void open() { numReferenceOfSparkContext.incrementAndGet(); } + private Results.Result interpret(String line) { + return (Results.Result) invokeMethod( + intp, + "interpret", + new Class[] {String.class}, + new Object[] {line}); + } + private List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); @@ -664,17 +895,22 @@ public List completion(String buf, int cursor) { completionText = ""; cursor = completionText.length(); } - ScalaCompleter c = completor.completer(); - Candidates ret = c.complete(completionText, cursor); + if (isScala2_10()) { + ScalaCompleter c = (ScalaCompleter) invokeMethod(completor, "completor"); + Candidates ret = c.complete(completionText, cursor); + + List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); + List completions = new LinkedList(); - List candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates()); - List completions = new LinkedList(); + for (String candidate : candidates) { + completions.add(new InterpreterCompletion(candidate, candidate)); + } - for (String candidate : candidates) { - completions.add(new InterpreterCompletion(candidate, candidate)); + return completions; + } else { + return new LinkedList(); } - return completions; } private String getCompletionTargetString(String text, int cursor) { @@ -718,9 +954,14 @@ private String getCompletionTargetString(String text, int cursor) { return resultCompletionText; } + /* + * this method doesn't work in scala 2.11 + * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option + */ public Object getValue(String name) { - Object ret = intp.valueOfTerm(name); - if (ret instanceof None) { + Object ret = invokeMethod(intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name}); + + if (ret instanceof None || ret instanceof scala.None$) { return null; } else if (ret instanceof Some) { return ((Some) ret).get(); @@ -729,6 +970,16 @@ public Object getValue(String name) { } } + public Object getLastObject() { + IMain.Request r = (IMain.Request) invokeMethod(intp, "lastRequest"); + if (r == null || r.lineRep() == null) { + return null; + } + Object obj = r.lineRep().call("$result", + JavaConversions.asScalaBuffer(new LinkedList())); + return obj; + } + String getJobGroup(InterpreterContext context){ return "zeppelin-" + context.getParagraphId(); } @@ -760,6 +1011,7 @@ public InterpreterResult interpret(String[] lines, InterpreterContext context) { } } + public InterpreterResult interpretInput(String[] lines, InterpreterContext context) { SparkEnv.set(env); @@ -808,7 +1060,7 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte scala.tools.nsc.interpreter.Results.Result res = null; try { - res = intp.interpret(incomplete + s); + res = interpret(incomplete + s); } catch (Exception e) { sc.clearJobGroup(); out.setInterpreterOutput(null); @@ -831,8 +1083,8 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte // make sure code does not finish with comment if (r == Code.INCOMPLETE) { - scala.tools.nsc.interpreter.Results.Result res; - res = intp.interpret(incomplete + "\nprint(\"\")"); + scala.tools.nsc.interpreter.Results.Result res = null; + res = interpret(incomplete + "\nprint(\"\")"); r = getResultCode(res); } @@ -849,12 +1101,23 @@ public InterpreterResult interpretInput(String[] lines, InterpreterContext conte } private void putLatestVarInResourcePool(InterpreterContext context) { - String varName = intp.mostRecentVar(); + String varName = (String) invokeMethod(intp, "mostRecentVar"); if (varName == null || varName.isEmpty()) { return; } - Object lastObj = getValue(varName); + Object lastObj = null; + try { + if (isScala2_10()) { + lastObj = getValue(varName); + } else { + lastObj = getLastObject(); + } + } catch (NullPointerException e) { + // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE + logger.error(e.getMessage(), e); + } + if (lastObj != null) { ResourcePool resourcePool = context.getResourcePool(); resourcePool.put(context.getNoteId(), context.getParagraphId(), @@ -998,9 +1261,13 @@ public void close() { if (numReferenceOfSparkContext.decrementAndGet() == 0) { sc.stop(); sc = null; + if (classServer != null) { + invokeMethod(classServer, "stop"); + classServer = null; + } } - intp.close(); + invokeMethod(intp, "close"); } @Override @@ -1025,4 +1292,136 @@ public ZeppelinContext getZeppelinContext() { public SparkVersion getSparkVersion() { return sparkVersion; } + + private Object invokeMethod(Object o, String name) { + return invokeMethod(o, name, new Class[]{}, new Object[]{}); + } + + private Object invokeMethod(Object o, String name, Class [] argTypes, Object [] params) { + try { + return o.getClass().getMethod(name, argTypes).invoke(o, params); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + + private Object invokeStaticMethod(Class c, String name) { + return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); + } + + private Object invokeStaticMethod(Class c, String name, Class [] argTypes, Object [] params) { + try { + return c.getMethod(name, argTypes).invoke(null, params); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + + private Class getClass(String name) { + try { + return this.getClass().forName(name); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + private Object instantiateClass(String name, Class [] argTypes, Object [] params) { + try { + Constructor constructor = getClass().getClassLoader() + .loadClass(name).getConstructor(argTypes); + return constructor.newInstance(params); + } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + // function works after intp is initialized + boolean isScala2_10() { + try { + this.getClass().forName("org.apache.spark.repl.SparkIMain"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + boolean isScala2_11() { + return !isScala2_10(); + } + + boolean isSpark2() { + try { + this.getClass().forName("org.apache.spark.sql.SparkSession"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + private Class findClass(String name) { + try { + return this.getClass().forName(name); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + private File createTempDir(String dir) { + // try Utils.createTempDir() + try { + return (File) Utils.class.getMethod( + "createTempDir", + new Class[]{String.class, String.class}) + .invoke(null, new Object[]{dir, "spark"}); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + // fallback to old method + try { + return (File) Utils.class.getMethod( + "createTempDir", + new Class[]{String.class}) + .invoke(null, new Object[]{dir}); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e1) { + logger.error(e1.getMessage(), e1); + return null; + } + } + } + + private Object createHttpServer(File outputDir) { + SparkConf conf = new SparkConf(); + try { + // try to create HttpServer + Constructor constructor = getClass().getClassLoader() + .loadClass("org.apache.spark.HttpServer") + .getConstructor(new Class[]{ + SparkConf.class, File.class, SecurityManager.class, int.class, String.class}); + + return constructor.newInstance(new Object[] { + conf, outputDir, new SecurityManager(conf), 0, "HTTP Server"}); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + // fallback to old constructor + Constructor constructor = null; + try { + constructor = getClass().getClassLoader() + .loadClass("org.apache.spark.HttpServer") + .getConstructor(new Class[]{ + File.class, SecurityManager.class, int.class, String.class}); + return constructor.newInstance(new Object[] { + outputDir, new SecurityManager(conf), 0, "HTTP Server"}); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e1) { + logger.error(e1.getMessage(), e1); + return null; + } + } + } } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index a3636a29c1b..fc8923c4172 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -27,9 +27,7 @@ import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 2fa716b449e..17f2de7be24 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -32,10 +32,12 @@ public class SparkVersion { public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0"); public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0"); public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0"); - public static final SparkVersion SPARK_1_7_0 = SparkVersion.fromVersionString("1.7.0"); + + public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0"); + public static final SparkVersion SPARK_2_1_0 = SparkVersion.fromVersionString("2.1.0"); public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0; - public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_7_0; + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_2_1_0; private int version; private String versionString; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index bd8f0a1ca5c..bfd2b468c58 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -31,7 +31,6 @@ import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.hive.HiveContext; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -70,7 +69,6 @@ public ZeppelinContext(SparkContext sc, SQLContext sql, public SparkContext sc; public SQLContext sqlContext; - public HiveContext hiveContext; private GUI gui; @ZeppelinApi diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java index e4881d373be..c4047977861 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java @@ -29,7 +29,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkContext; -import org.apache.spark.repl.SparkIMain; import org.apache.zeppelin.dep.AbstractDependencyResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +59,7 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); private Global global; - private SparkIMain intp; + private ClassLoader runtimeClassLoader; private SparkContext sc; private final String[] exclusions = new String[] {"org.scala-lang:scala-library", @@ -71,11 +70,14 @@ public class SparkDependencyResolver extends AbstractDependencyResolver { "org.apache.zeppelin:zeppelin-spark", "org.apache.zeppelin:zeppelin-server"}; - public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath, - String additionalRemoteRepository) { + public SparkDependencyResolver(Global global, + ClassLoader runtimeClassLoader, + SparkContext sc, + String localRepoPath, + String additionalRemoteRepository) { super(localRepoPath); - this.intp = intp; - this.global = intp.global(); + this.global = global; + this.runtimeClassLoader = runtimeClassLoader; this.sc = sc; addRepoFromProperty(additionalRemoteRepository); } @@ -127,24 +129,22 @@ private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException, private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException { - ClassLoader cl = intp.classLoader().getParent(); Method addURL; - addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); + addURL = runtimeClassLoader.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); addURL.setAccessible(true); for (URL url : urls) { - addURL.invoke(cl, url); + addURL.invoke(runtimeClassLoader, url); } } private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException { - ClassLoader cl = intp.classLoader().getParent(); Method addURL; - addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); + addURL = runtimeClassLoader.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); addURL.setAccessible(true); for (URL url : urls) { - addURL.invoke(cl, url); + addURL.invoke(runtimeClassLoader, url); } } @@ -209,7 +209,7 @@ public List load(String artifact, Collection excludes, private void loadFromFs(String artifact, boolean addSparkContext) throws Exception { File jarFile = new File(artifact); - intp.global().new Run(); + global.new Run(); if (sc.version().startsWith("1.1")) { updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()}); @@ -257,7 +257,7 @@ private List loadFromMvn(String artifact, Collection excludes, + artifactResult.getArtifact().getVersion()); } - intp.global().new Run(); + global.new Run(); if (sc.version().startsWith("1.1")) { updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0])); } else { diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 0ea547487e0..0380afa7fea 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -29,7 +29,7 @@ from pyspark.serializers import MarshalSerializer, PickleSerializer # for back compatibility -from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row +from pyspark.sql import SQLContext, HiveContext, Row class Logger(object): def __init__(self): @@ -107,6 +107,7 @@ def __tupleToScalaTuple2(self, tuple): class SparkVersion(object): SPARK_1_4_0 = 140 SPARK_1_3_0 = 130 + SPARK_2_0_0 = 200 def __init__(self, versionNumber): self.version = versionNumber @@ -117,6 +118,9 @@ def isAutoConvertEnabled(self): def isImportAllPackageUnderSparkSql(self): return self.version >= self.SPARK_1_3_0 + def isSpark2(self): + return self.version >= self.SPARK_2_0_0 + class PySparkCompletion: def __init__(self, interpreterObject): self.interpreterObject = interpreterObject @@ -175,6 +179,12 @@ def getCompletion(self, text_value): client = GatewayClient(port=int(sys.argv[1])) sparkVersion = SparkVersion(int(sys.argv[2])) +if sparkVersion.isSpark2(): + from pyspark.sql import SparkSession +else: + from pyspark.sql import SchemaRDD + + if sparkVersion.isAutoConvertEnabled(): gateway = JavaGateway(client, auto_convert = True) else: @@ -209,6 +219,9 @@ def getCompletion(self, text_value): sqlc = SQLContext(sc, intp.getSQLContext()) sqlContext = sqlc +if sparkVersion.isSpark2(): + spark = SparkSession(sc, intp.getSparkSession()) + completion = PySparkCompletion(intp) z = PyZeppelinContext(intp.getZeppelinContext()) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 90f311ed5c3..de07d43e6fb 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -24,12 +24,11 @@ import java.util.LinkedList; import java.util.Properties; -import org.apache.spark.HttpServer; -import org.apache.spark.SecurityManager; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.resource.WellKnownResourceName; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; @@ -139,6 +138,7 @@ public void testBasicIntp() { assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); assertTrue(incomplete.message().length() > 0); // expecting some error // message + /* * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b")); * repl.interpret("val ver = sc.version"); @@ -174,6 +174,17 @@ public void testListener() { assertNotNull(SparkInterpreter.setupListeners(sc)); } + @Test + public void testCreateDataFrame() { + repl.interpret("case class Person(name:String, age:Int)\n", context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); + repl.interpret("people.toDF.count", context); + assertEquals(new Long(4), context.getResourcePool().get( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ZeppelinReplResult.toString()).get()); + } + @Test public void testSparkSql(){ repl.interpret("case class Person(name:String, age:Int)\n", context); @@ -182,15 +193,15 @@ public void testSparkSql(){ if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default. - // create new interpreter - Properties p = new Properties(); - SparkInterpreter repl2 = new SparkInterpreter(p); - repl2.open(); - - repl.interpret("case class Man(name:String, age:Int)", context); - repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); - assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); - repl2.getSparkContext().stop(); + // create new interpreter + Properties p = new Properties(); + SparkInterpreter repl2 = new SparkInterpreter(p); + repl2.open(); + + repl.interpret("case class Man(name:String, age:Int)", context); + repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); + assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); + repl2.getSparkContext().stop(); } } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 73d5e8abbc8..95622cca6b3 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -159,11 +159,14 @@ public void test_null_value_in_row() { repl.interpret( "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context); - repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", - context); + if (isDataFrameSupported()) { + repl.interpret("val people = z.sqlContext.createDataFrame(raw, schema)", + context); repl.interpret("people.toDF.registerTempTable(\"people\")", context); } else { + repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", + context); repl.interpret("people.registerTempTable(\"people\")", context); } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java index a0271f4471d..b226a001d24 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; -import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.junit.Test; public class SparkDependencyResolverTest { diff --git a/zeppelin-display/pom.xml b/zeppelin-display/pom.xml index 4bf29e65e3a..1f316fcd5f7 100644 --- a/zeppelin-display/pom.xml +++ b/zeppelin-display/pom.xml @@ -33,11 +33,6 @@ Zeppelin: Display system apis http://zeppelin.apache.org - - 2.10.4 - 2.10 - - @@ -90,12 +85,29 @@ org.scalatest - scalatest_2.10 - 2.1.1 + scalatest_${scala.binary.version} + ${scalatest.version} test + + + scala-2.11 + + scala-2.11 + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + diff --git a/zeppelin-distribution/pom.xml b/zeppelin-distribution/pom.xml index 65fc0cbbaa6..f65044e6faf 100644 --- a/zeppelin-distribution/pom.xml +++ b/zeppelin-distribution/pom.xml @@ -45,6 +45,34 @@ + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + org.scala-lang + scalap + ${scala.version} + + + + zeppelin-server @@ -84,6 +112,23 @@ + + scala-2.11 + + scala-2.11 + + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + publish-distr diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index fd5c31cd7f5..97b4c66268b 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -43,19 +43,25 @@ org.scala-lang scala-library - 2.10.4 + ${scala.version} org.scala-lang scala-compiler - 2.10.4 + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} org.scala-lang scalap - 2.10.4 + ${scala.version} @@ -221,6 +227,19 @@ org.scala-lang scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} @@ -258,8 +277,8 @@ org.scalatest - scalatest_2.10 - 2.1.1 + scalatest_${scala.binary.version} + ${scalatest.version} test @@ -393,6 +412,23 @@ + + scala-2.11 + + scala-2.11 + + + + + + org.scala-lang.modules + scala-xml_${scala.binary.version} + 1.0.2 + + + + + using-source-tree diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index f2d4c99ca49..bd3448413a1 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -102,6 +102,30 @@ protected static void startUp() throws Exception { System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist"); LOG.info("Staring test Zeppelin up..."); + + + // exclude org.apache.zeppelin.rinterpreter.* for scala 2.11 test + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + String interpreters = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS); + String interpretersCompatibleWithScala211Test = null; + + for (String intp : interpreters.split(",")) { + if (intp.startsWith("org.apache.zeppelin.rinterpreter")) { + continue; + } + + if (interpretersCompatibleWithScala211Test == null) { + interpretersCompatibleWithScala211Test = intp; + } else { + interpretersCompatibleWithScala211Test += "," + intp; + } + } + + System.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), + interpretersCompatibleWithScala211Test); + + executor = Executors.newSingleThreadExecutor(); executor.submit(server); long s = System.currentTimeMillis(); @@ -205,7 +229,7 @@ private static String getSparkHomeRecursively(File dir) { } private static boolean isActiveSparkHome(File dir) { - if (dir.getName().matches("spark-[0-9\\.]+-bin-hadoop[0-9\\.]+")) { + if (dir.getName().matches("spark-[0-9\\.]+[A-Za-z-]*-bin-hadoop[0-9\\.]+")) { File pidDir = new File(dir, "run"); if (pidDir.isDirectory() && pidDir.listFiles().length > 0) { return true; @@ -241,6 +265,8 @@ protected static void shutDown() throws Exception { } LOG.info("Test Zeppelin terminated."); + + System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName()); } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 3c77b45e71f..ee70f46b0a8 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -142,16 +142,22 @@ public void pySparkAutoConvertOptionTest() throws IOException { Note note = ZeppelinServer.notebook.createNote(null); note.setName("note"); - int sparkVersion = getSparkVersionNumber(note); + int sparkVersionNumber = getSparkVersionNumber(note); - if (isPyspark() && sparkVersion >= 14) { // auto_convert enabled from spark 1.4 + if (isPyspark() && sparkVersionNumber >= 14) { // auto_convert enabled from spark 1.4 // run markdown paragraph, again Paragraph p = note.addParagraph(); Map config = p.getConfig(); config.put("enabled", true); p.setConfig(config); + + String sqlContextName = "sqlContext"; + if (sparkVersionNumber >= 20) { + sqlContextName = "spark"; + } + p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" - + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); + + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); // p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open(); note.run(p.getId()); waitForFinish(p); @@ -197,8 +203,8 @@ public void zRunTest() throws IOException { public void pySparkDepLoaderTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(null); - - if (isPyspark() && getSparkVersionNumber(note) >= 14) { + int sparkVersionNumber = getSparkVersionNumber(note); + if (isPyspark() && sparkVersionNumber >= 14) { // restart spark interpreter List settings = ZeppelinServer.notebook.getBindedInterpreterSettings(note.id()); @@ -227,9 +233,14 @@ public void pySparkDepLoaderTest() throws IOException { // load data using libraries from dep loader Paragraph p1 = note.addParagraph(); p1.setConfig(config); + + String sqlContextName = "sqlContext"; + if (sparkVersionNumber >= 20) { + sqlContextName = "spark"; + } p1.setText("%pyspark\n" + "from pyspark.sql import SQLContext\n" + - "print(sqlContext.read.format('com.databricks.spark.csv')" + + "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + ".load('"+ tmpFile.getAbsolutePath() +"').count())"); note.run(p1.getId());