diff --git a/.gitignore b/.gitignore index ca407595..93962a16 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ zan-repo/ drivers/ warehouse/ notebook/ +local-repo/ **/sessions/ **/data/ diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index d4ed6a6f..4070fed8 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -48,7 +48,7 @@ zeppelin.interpreters - com.nflabs.zeppelin.spark.SparkInterpreter,com.nflabs.zeppelin.spark.SparkSqlInterpreter,com.nflabs.zeppelin.markdown.Markdown,com.nflabs.zeppelin.shell.ShellInterpreter + com.nflabs.zeppelin.spark.SparkInterpreter,com.nflabs.zeppelin.spark.SparkSqlInterpreter,com.nflabs.zeppelin.spark.DepInterpreter,com.nflabs.zeppelin.markdown.Markdown,com.nflabs.zeppelin.shell.ShellInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/pom.xml b/pom.xml index 8104c129..68df9fea 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,7 @@ 1.4.3 1.2.3 8.1.14.v20131031 + 3.2.10 0.3.6 3.0.0 1.7.6 @@ -794,6 +795,32 @@ 2.2.6 + + + + org.json4s + json4s-core_2.10 + ${json4s.version} + + + + org.json4s + json4s-native_2.10 + ${json4s.version} + + + + org.json4s + json4s-jackson_2.10 + ${json4s.version} + + + + org.json4s + json4s-ext_2.10 + ${json4s.version} + + diff --git a/spark/pom.xml b/spark/pom.xml index 273836ba..a8a37c8d 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -226,6 +226,14 @@ + + org.apache.maven.wagon + wagon-http + 1.0 + + + + junit junit diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/DepInterpreter.java new file mode 100644 index 00000000..d7414413 --- /dev/null +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/DepInterpreter.java @@ -0,0 +1,277 @@ +package com.nflabs.zeppelin.spark; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.spark.repl.SparkILoop; +import org.apache.spark.repl.SparkIMain; +import org.apache.spark.repl.SparkJLineCompletion; +import org.sonatype.aether.resolution.ArtifactResolutionException; +import org.sonatype.aether.resolution.DependencyResolutionException; + +import scala.Console; +import scala.None; +import scala.Some; +import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.Completion.Candidates; +import scala.tools.nsc.interpreter.Completion.ScalaCompleter; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +import com.nflabs.zeppelin.interpreter.Interpreter; +import com.nflabs.zeppelin.interpreter.InterpreterContext; +import com.nflabs.zeppelin.interpreter.InterpreterGroup; +import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; +import com.nflabs.zeppelin.interpreter.InterpreterResult; +import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; +import com.nflabs.zeppelin.interpreter.WrappedInterpreter; +import com.nflabs.zeppelin.scheduler.Scheduler; +import com.nflabs.zeppelin.spark.dep.DependencyContext; + + +/** + * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized. + * It extends SparkInterpreter but does not create sparkcontext + * + */ +public class DepInterpreter extends Interpreter { + + static { + Interpreter.register( + "dep", + "spark", + DepInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .build()); + + } + + private SparkIMain intp; + private ByteArrayOutputStream out; + private DependencyContext depc; + private SparkJLineCompletion completor; + private SparkILoop interpreter; + + public DepInterpreter(Properties property) { + super(property); + } + + public DependencyContext getDependencyContext() { + return depc; + } + + + @Override + public void close() { + if (intp != null) { + intp.close(); + } + } + + @Override + public void open() { + out = new ByteArrayOutputStream(); + createIMain(); + } + + + private void createIMain() { + Settings settings = new Settings(); + URL[] urls = getClassloaderUrls(); + + // set classpath for scala compiler + PathSetting pathSettings = settings.classpath(); + String classpath = ""; + List paths = currentClassPath(); + for (File f : paths) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + + if (urls != null) { + for (URL u : urls) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += u.getFile(); + } + } + + pathSettings.v_$eq(classpath); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + + // set classloader for scala compiler + settings.explicitParentLoader_$eq(new Some(Thread.currentThread() + .getContextClassLoader())); + + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + interpreter = new SparkILoop(null, new PrintWriter(out)); + interpreter.settings_$eq(settings); + + interpreter.createInterpreter(); + + + intp = interpreter.intp(); + intp.setContextClassLoader(); + intp.initializeSynchronous(); + + depc = new DependencyContext(); + completor = new SparkJLineCompletion(intp); + + intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + Map binder = (Map) getValue("_binder"); + binder.put("depc", depc); + + intp.interpret("@transient val z = " + + "_binder.get(\"depc\").asInstanceOf[com.nflabs.zeppelin.spark.dep.DependencyContext]"); + + } + + @Override + public Object getValue(String name) { + Object ret = intp.valueOfTerm(name); + if (ret instanceof None) { + return null; + } else if (ret instanceof Some) { + return ((Some) ret).get(); + } else { + return ret; + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + PrintStream printStream = new PrintStream(out); + Console.setOut(printStream); + out.reset(); + + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter == null) { + return new InterpreterResult(Code.ERROR, + "Must be used with SparkInterpreter"); + } + if (sparkInterpreter.isSparkContextInitialized()) { + return new InterpreterResult(Code.ERROR, + "Must be used before SparkInterpreter (%spark) initialized"); + } + + scala.tools.nsc.interpreter.Results.Result ret = intp.interpret(st); + Code code = getResultCode(ret); + + try { + depc.fetch(); + } catch (MalformedURLException | DependencyResolutionException + | ArtifactResolutionException e) { + return new InterpreterResult(Code.ERROR, e.toString()); + } + + if (code == Code.INCOMPLETE) { + return new InterpreterResult(code, "Incomplete expression"); + } else if (code == Code.ERROR) { + return new InterpreterResult(code, out.toString()); + } else { + return new InterpreterResult(code, out.toString()); + } + } + + private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { + if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public void bindValue(String name, Object o) { + } + + @Override + public FormType getFormType() { + return null; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List completion(String buf, int cursor) { + ScalaCompleter c = completor.completer(); + Candidates ret = c.complete(buf, cursor); + return scala.collection.JavaConversions.asJavaList(ret.candidates()); + } + + private List currentClassPath() { + List paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + if (cps != null) { + for (String cp : cps) { + paths.add(new File(cp)); + } + } + return paths; + } + + private List classPath(ClassLoader cl) { + List paths = new LinkedList(); + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + return paths; + } + + private SparkInterpreter getSparkInterpreter() { + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) { + return null; + } + for (Interpreter intp : intpGroup){ + if (intp.getClassName().equals(SparkInterpreter.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + return (SparkInterpreter) p; + } + } + return null; + } + + @Override + public Scheduler getScheduler() { + return getSparkInterpreter().getScheduler(); + } + +} diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java index e4bae314..3287794d 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java @@ -49,12 +49,15 @@ import com.nflabs.zeppelin.interpreter.Interpreter; import com.nflabs.zeppelin.interpreter.InterpreterContext; +import com.nflabs.zeppelin.interpreter.InterpreterGroup; import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; import com.nflabs.zeppelin.interpreter.InterpreterResult; import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; +import com.nflabs.zeppelin.interpreter.WrappedInterpreter; import com.nflabs.zeppelin.notebook.form.Setting; import com.nflabs.zeppelin.scheduler.Scheduler; import com.nflabs.zeppelin.scheduler.SchedulerFactory; +import com.nflabs.zeppelin.spark.dep.DependencyContext; import com.nflabs.zeppelin.spark.dep.DependencyResolver; /** @@ -122,6 +125,10 @@ public synchronized SparkContext getSparkContext() { return sc; } + public boolean isSparkContextInitialized() { + return sc != null; + } + private static JobProgressListener setupListeners(SparkContext context) { JobProgressListener pl = new JobProgressListener(context.getConf()); context.listenerBus().addListener(pl); @@ -149,6 +156,21 @@ public DependencyResolver getDependencyResolver() { return dep; } + private DepInterpreter getDepInterpreter() { + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) return null; + for (Interpreter intp : intpGroup) { + if (intp.getClassName().equals(DepInterpreter.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + return (DepInterpreter) p; + } + } + return null; + } + public SparkContext createSparkContext() { System.err.println("------ Create new SparkContext " + getProperty("master") + " -------"); @@ -259,6 +281,23 @@ public void open() { } } + // add dependency from DepInterpreter + DepInterpreter depInterpreter = getDepInterpreter(); + if (depInterpreter != null) { + DependencyContext depc = depInterpreter.getDependencyContext(); + if (depc != null) { + List files = depc.getFiles(); + if (files != null) { + for (File f : files) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + } + } + } + pathSettings.v_$eq(classpath); settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); @@ -319,6 +358,25 @@ public void open() { + "_binder.get(\"hiveContext\").asInstanceOf[org.apache.spark.sql.hive.HiveContext]"); intp.interpret("import org.apache.spark.SparkContext._"); intp.interpret("import sqlc._"); + + // add jar + if (depInterpreter != null) { + DependencyContext depc = depInterpreter.getDependencyContext(); + if (depc != null) { + List files = depc.getFilesDist(); + if (files != null) { + for (File f : files) { + if (f.getName().toLowerCase().endsWith(".jar")) { + sc.addJar(f.getAbsolutePath()); + logger.info("sc.addJar(" + f.getAbsolutePath() + ")"); + } else { + sc.addFile(f.getAbsolutePath()); + logger.info("sc.addFile(" + f.getAbsolutePath() + ")"); + } + } + } + } + } } private List currentClassPath() { diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java index 627ccccc..df7a0d96 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java @@ -1,6 +1,11 @@ package com.nflabs.zeppelin.spark; +import static scala.collection.JavaConversions.asJavaCollection; +import static scala.collection.JavaConversions.asJavaIterable; +import static scala.collection.JavaConversions.collectionAsScalaIterable; + import java.io.PrintStream; +import java.util.Collection; import java.util.Iterator; import org.apache.spark.SparkContext; @@ -9,6 +14,7 @@ import org.apache.spark.sql.hive.HiveContext; import scala.Tuple2; +import scala.collection.Iterable; import com.nflabs.zeppelin.interpreter.Interpreter; import com.nflabs.zeppelin.interpreter.InterpreterContext; @@ -52,35 +58,116 @@ public SchemaRDD sql(String sql) { /** * Load dependency for interpreter and runtime (driver). + * And distribute them to spark cluster (sc.add()) * - * @param artifact "group:artifact:version" + * @param artifact "group:artifact:version" or file path like "/somepath/your.jar" + * @return * @throws Exception */ - public void load(String artifact) throws Exception { - dep.load(artifact, false, false); + public Iterable load(String artifact) throws Exception { + return collectionAsScalaIterable(dep.load(artifact, true)); } /** - * Load dependency for interpreter and runtime (driver). + * Load dependency and it's transitive dependencies for interpreter and runtime (driver). + * And distribute them to spark cluster (sc.add()) + * + * @param artifact "groupId:artifactId:version" or file path like "/somepath/your.jar" + * @param excludes exclusion list of transitive dependency. list of "groupId:artifactId" string. + * @return + * @throws Exception + */ + public Iterable load(String artifact, scala.collection.Iterable excludes) + throws Exception { + return collectionAsScalaIterable( + dep.load(artifact, + asJavaCollection(excludes), + true)); + } + + /** + * Load dependency and it's transitive dependencies for interpreter and runtime (driver). + * And distribute them to spark cluster (sc.add()) * - * @param artifact "group:artifact:version" + * @param artifact "groupId:artifactId:version" or file path like "/somepath/your.jar" + * @param excludes exclusion list of transitive dependency. list of "groupId:artifactId" string. + * @return * @throws Exception */ - public void load(String artifact, boolean recursive) throws Exception { - dep.load(artifact, recursive, false); + public Iterable load(String artifact, Collection excludes) throws Exception { + return collectionAsScalaIterable(dep.load(artifact, excludes, true)); } /** * Load dependency for interpreter and runtime, and then add to sparkContext. + * But not adding them to spark cluster + * + * @param artifact "groupId:artifactId:version" or file path like "/somepath/your.jar" + * @return + * @throws Exception + */ + public Iterable loadLocal(String artifact) throws Exception { + return collectionAsScalaIterable(dep.load(artifact, false)); + } + + + /** + * Load dependency and it's transitive dependencies and then add to sparkContext. + * But not adding them to spark cluster + * + * @param artifact "groupId:artifactId:version" or file path like "/somepath/your.jar" + * @param excludes exclusion list of transitive dependency. list of "groupId:artifactId" string. + * @return + * @throws Exception + */ + public Iterable loadLocal(String artifact, + scala.collection.Iterable excludes) throws Exception { + return collectionAsScalaIterable(dep.load(artifact, + asJavaCollection(excludes), false)); + } + + /** + * Load dependency and it's transitive dependencies and then add to sparkContext. + * But not adding them to spark cluster * + * @param artifact "groupId:artifactId:version" or file path like "/somepath/your.jar" + * @param excludes exclusion list of transitive dependency. list of "groupId:artifactId" string. + * @return * @throws Exception */ - public void loadAndDist(String artifact) throws Exception { - dep.load(artifact, false, true); + public Iterable loadLocal(String artifact, Collection excludes) + throws Exception { + return collectionAsScalaIterable(dep.load(artifact, excludes, false)); + } + + + /** + * Add maven repository + * + * @param id id of repository ex) oss, local, snapshot + * @param url url of repository. supported protocol : file, http, https + */ + public void addRepo(String id, String url) { + addRepo(id, url, false); + } + + /** + * Add maven repository + * + * @param id id of repository + * @param url url of repository. supported protocol : file, http, https + * @param snapshot true if it is snapshot repository + */ + public void addRepo(String id, String url, boolean snapshot) { + dep.addRepo(id, url, snapshot); } - public void loadAndDist(String artifact, boolean recursive) throws Exception { - dep.load(artifact, true, true); + /** + * Remove maven repository by id + * @param id id of repository + */ + public void removeRepo(String id){ + dep.delRepo(id); } /** @@ -106,8 +193,7 @@ public Object select(String name, Object defaultValue, scala.collection.Iterable> options) { int n = options.size(); ParamOption[] paramOptions = new ParamOption[n]; - Iterator> it = - scala.collection.JavaConversions.asJavaIterable(options).iterator(); + Iterator> it = asJavaIterable(options).iterator(); int i = 0; while (it.hasNext()) { diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java new file mode 100644 index 00000000..f8f64943 --- /dev/null +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java @@ -0,0 +1,73 @@ +package com.nflabs.zeppelin.spark.dep; + +import java.util.LinkedList; +import java.util.List; + +/** + * + */ +public class Dependency { + private String groupArtifactVersion; + private boolean local = false; + private List exclusions; + + + public Dependency(String groupArtifactVersion) { + this.groupArtifactVersion = groupArtifactVersion; + exclusions = new LinkedList(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Dependency)) { + return false; + } else { + return ((Dependency) o).groupArtifactVersion.equals(groupArtifactVersion); + } + } + + /** + * Don't add artifact into SparkContext (sc.addJar()) + * @return + */ + public Dependency local() { + local = true; + return this; + } + + public Dependency excludeAll() { + exclude("*"); + return this; + } + + /** + * + * @param exclusions comma or newline separated list of "groupId:ArtifactId" + * @return + */ + public Dependency exclude(String exclusions) { + for (String item : exclusions.split(",|\n")) { + this.exclusions.add(item); + } + + return this; + } + + + public String getGroupArtifactVersion() { + return groupArtifactVersion; + } + + public boolean isDist() { + return !local; + } + + public List getExclusions() { + return exclusions; + } + + public boolean isLocalFsArtifact() { + int numSplits = groupArtifactVersion.split(":").length; + return !(numSplits >= 3 && numSplits <= 6); + } +} diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java new file mode 100644 index 00000000..e6dedf04 --- /dev/null +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java @@ -0,0 +1,130 @@ +package com.nflabs.zeppelin.spark.dep; + +import java.io.File; +import java.net.MalformedURLException; +import java.util.LinkedList; +import java.util.List; + +import org.sonatype.aether.RepositorySystem; +import org.sonatype.aether.RepositorySystemSession; +import org.sonatype.aether.artifact.Artifact; +import org.sonatype.aether.collection.CollectRequest; +import org.sonatype.aether.graph.DependencyFilter; +import org.sonatype.aether.repository.RemoteRepository; +import org.sonatype.aether.resolution.ArtifactResolutionException; +import org.sonatype.aether.resolution.ArtifactResult; +import org.sonatype.aether.resolution.DependencyRequest; +import org.sonatype.aether.resolution.DependencyResolutionException; +import org.sonatype.aether.util.artifact.DefaultArtifact; +import org.sonatype.aether.util.artifact.JavaScopes; +import org.sonatype.aether.util.filter.DependencyFilterUtils; +import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; + + +/** + * + */ +public class DependencyContext { + List dependencies = new LinkedList(); + List repositories = new LinkedList(); + + List files = new LinkedList(); + List filesDist = new LinkedList(); + private RepositorySystem system = Booter.newRepositorySystem(); + private RepositorySystemSession session = Booter.newRepositorySystemSession(system); + private RemoteRepository mavenCentral = new RemoteRepository("central", + "default", "http://repo1.maven.org/maven2/"); + private RemoteRepository mavenLocal = new RemoteRepository("local", + "default", "file://" + System.getProperty("user.home") + "/.m2/repository"); + + public Dependency load(String lib) { + Dependency dep = new Dependency(lib); + + if (dependencies.contains(dep)) { + dependencies.remove(dep); + } + dependencies.add(dep); + return dep; + } + + public Repository addRepo(String name) { + Repository rep = new Repository(name); + repositories.add(rep); + return rep; + } + + public void reset() { + dependencies = new LinkedList(); + repositories = new LinkedList(); + + files = new LinkedList(); + filesDist = new LinkedList(); + } + + + /** + * fetch all artifacts + * @return + * @throws MalformedURLException + * @throws ArtifactResolutionException + * @throws DependencyResolutionException + */ + public List fetch() throws MalformedURLException, + DependencyResolutionException, ArtifactResolutionException { + + for (Dependency dep : dependencies) { + if (!dep.isLocalFsArtifact()) { + List artifacts = fetchArtifactWithDep(dep); + for (ArtifactResult artifact : artifacts) { + if (dep.isDist()) { + filesDist.add(artifact.getArtifact().getFile()); + } + files.add(artifact.getArtifact().getFile()); + } + } else { + if (dep.isDist()) { + filesDist.add(new File(dep.getGroupArtifactVersion())); + } + files.add(new File(dep.getGroupArtifactVersion())); + } + } + + return files; + } + + private List fetchArtifactWithDep(Dependency dep) + throws DependencyResolutionException, ArtifactResolutionException { + Artifact artifact = new DefaultArtifact( + DependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); + + DependencyFilter classpathFlter = DependencyFilterUtils + .classpathFilter(JavaScopes.COMPILE); + PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter( + DependencyResolver.inferScalaVersion(dep.getExclusions())); + + CollectRequest collectRequest = new CollectRequest(); + collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, + JavaScopes.COMPILE)); + + collectRequest.addRepository(mavenCentral); + collectRequest.addRepository(mavenLocal); + for (Repository repo : repositories) { + RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl()); + rr.setPolicy(repo.isSnapshot(), null); + collectRequest.addRepository(rr); + } + + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); + + return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); + } + + public List getFiles() { + return files; + } + + public List getFilesDist() { + return filesDist; + } +} diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java index 2a6955ec..aacb4d66 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java @@ -4,6 +4,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; +import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -20,26 +22,25 @@ import org.sonatype.aether.graph.Dependency; import org.sonatype.aether.graph.DependencyFilter; import org.sonatype.aether.repository.RemoteRepository; -import org.sonatype.aether.resolution.ArtifactRequest; import org.sonatype.aether.resolution.ArtifactResult; import org.sonatype.aether.resolution.DependencyRequest; import org.sonatype.aether.util.artifact.DefaultArtifact; import org.sonatype.aether.util.artifact.JavaScopes; import org.sonatype.aether.util.filter.DependencyFilterUtils; +import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; import scala.Some; import scala.collection.IndexedSeq; import scala.reflect.io.AbstractFile; import scala.tools.nsc.Global; import scala.tools.nsc.backend.JavaPlatform; -import scala.tools.nsc.interpreter.AbstractFileClassLoader; import scala.tools.nsc.util.ClassPath; import scala.tools.nsc.util.MergedClassPath; /** * Deps resolver. * Add new dependencies from mvn repo (at runetime) to Zeppelin. - * + * * @author anthonycorbacho * */ @@ -49,7 +50,7 @@ public class DependencyResolver { private SparkIMain intp; private SparkContext sc; private RepositorySystem system = Booter.newRepositorySystem(); - private RemoteRepository repo = Booter.newCentralRepository(); + private List repos = new LinkedList(); private RepositorySystemSession session = Booter.newRepositorySystemSession(system); private DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( JavaScopes.COMPILE, @@ -59,6 +60,8 @@ public class DependencyResolver { private final String[] exclusions = new String[] {"org.scala-lang:scala-library", "org.scala-lang:scala-compiler", + "org.scala-lang:scala-reflect", + "org.scala-lang:scalap", "com.nflabs.zeppelin:zeppelin-zengine", "com.nflabs.zeppelin:zeppelin-spark", "com.nflabs.zeppelin:zeppelin-server"}; @@ -67,6 +70,32 @@ public DependencyResolver(SparkIMain intp, SparkContext sc) { this.intp = intp; this.global = intp.global(); this.sc = sc; + repos.add(Booter.newCentralRepository()); // add maven central + repos.add(new RemoteRepository("local", "default", "file://" + + System.getProperty("user.home") + "/.m2/repository")); + } + + public void addRepo(String id, String url, boolean snapshot) { + synchronized (repos) { + delRepo(id); + RemoteRepository rr = new RemoteRepository(id, "default", url); + rr.setPolicy(snapshot, null); + repos.add(rr); + } + } + + public RemoteRepository delRepo(String id) { + synchronized (repos) { + Iterator it = repos.iterator(); + if (it.hasNext()) { + RemoteRepository repo = it.next(); + if (repo.getId().equals(id)) { + it.remove(); + return repo; + } + } + } + return null; } private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException, @@ -141,54 +170,58 @@ private MergedClassPath mergeUrlsIntoClassPath(JavaPlatform platfo platform.classPath().context()); } - public void load(String groupId, String artifactId, String version, boolean recursive, + public List load(String artifact, boolean addSparkContext) throws Exception { - if (StringUtils.isBlank(groupId) || StringUtils.isBlank(artifactId) - || StringUtils.isBlank(version)) { - // Should throw here - return; - } - load(groupId + ":" + artifactId + ":" + version, recursive, addSparkContext); + return load(artifact, new LinkedList(), addSparkContext); } - public void load(String artifact, boolean recursive, boolean addSparkContext) throws Exception { + public List load(String artifact, Collection excludes, + boolean addSparkContext) throws Exception { if (StringUtils.isBlank(artifact)) { // Should throw here - return; + throw new RuntimeException("Invalid artifact to load"); } - if (artifact.split(":").length == 3) { - loadFromMvn(artifact, recursive, addSparkContext); + // :[:[:]]: + int numSplits = artifact.split(":").length; + if (numSplits >= 3 && numSplits <= 6) { + return loadFromMvn(artifact, excludes, addSparkContext); } else { loadFromFs(artifact, addSparkContext); + LinkedList libs = new LinkedList(); + libs.add(artifact); + return libs; } } private void loadFromFs(String artifact, boolean addSparkContext) throws Exception { File jarFile = new File(artifact); - updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()}); + intp.global().new Run(); + updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()}); + updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()}); if (addSparkContext) { sc.addJar(jarFile.getAbsolutePath()); } } - private void loadFromMvn(String artifact, boolean recursive, boolean addSparkContext) - throws Exception { + private List loadFromMvn(String artifact, Collection excludes, + boolean addSparkContext) throws Exception { + List loadedLibs = new LinkedList(); + Collection allExclusions = new LinkedList(); + allExclusions.addAll(excludes); + allExclusions.addAll(Arrays.asList(exclusions)); + List listOfArtifact; - if (recursive) { - listOfArtifact = getArtifactsWithDep(artifact); - } else { - listOfArtifact = getArtifact(artifact); - } + listOfArtifact = getArtifactsWithDep(artifact, allExclusions); Iterator it = listOfArtifact.iterator(); while (it.hasNext()) { Artifact a = it.next().getArtifact(); String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion(); - for (String exclude : exclusions) { + for (String exclude : allExclusions) { if (gav.startsWith(exclude)) { it.remove(); break; @@ -204,36 +237,96 @@ private void loadFromMvn(String artifact, boolean recursive, boolean addSparkCon + artifactResult.getArtifact().getVersion()); newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL()); files.add(artifactResult.getArtifact().getFile()); + loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":" + + artifactResult.getArtifact().getArtifactId() + ":" + + artifactResult.getArtifact().getVersion()); } - updateCompilerClassPath(newClassPathList.toArray(new URL[0])); + intp.global().new Run(); updateRuntimeClassPath(newClassPathList.toArray(new URL[0])); + updateCompilerClassPath(newClassPathList.toArray(new URL[0])); if (addSparkContext) { for (File f : files) { sc.addJar(f.getAbsolutePath()); } } - } - - public List getArtifact(String dependency) throws Exception { - Artifact artifact = new DefaultArtifact(dependency); - ArtifactRequest artifactRequest = new ArtifactRequest(); - artifactRequest.setArtifact(artifact); - artifactRequest.addRepository(repo); - ArtifactResult artifactResult = system.resolveArtifact(session, artifactRequest); - LinkedList results = new LinkedList(); - results.add(artifactResult); - return results; + return loadedLibs; } - public List getArtifactsWithDep(String dependency) throws Exception { - Artifact artifact = new DefaultArtifact(dependency); + /** + * + * @param dependency + * @param excludes list of pattern can either be of the form groupId:artifactId + * @return + * @throws Exception + */ + public List getArtifactsWithDep(String dependency, + Collection excludes) throws Exception { + Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency)); + DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( JavaScopes.COMPILE ); + PatternExclusionsDependencyFilter exclusionFilter = + new PatternExclusionsDependencyFilter(inferScalaVersion(excludes)); + CollectRequest collectRequest = new CollectRequest(); collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE)); - collectRequest.addRepository(repo); - DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, classpathFlter); + + synchronized (repos) { + for (RemoteRepository repo : repos) { + collectRequest.addRepository(repo); + } + } + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); } + + public static Collection inferScalaVersion(Collection artifact) { + List list = new LinkedList(); + for (String a : artifact) { + list.add(inferScalaVersion(a)); + } + return list; + } + + public static String inferScalaVersion(String artifact) { + int pos = artifact.indexOf(":"); + if (pos < 0 || pos + 2 >= artifact.length()) { + // failed to infer + return artifact; + } + + if (':' == artifact.charAt(pos + 1)) { + String restOfthem = ""; + String versionSep = ":"; + + String groupId = artifact.substring(0, pos); + int nextPos = artifact.indexOf(":", pos + 2); + if (nextPos < 0) { + if (artifact.charAt(artifact.length() - 1) == '*') { + nextPos = artifact.length() - 1; + versionSep = ""; + restOfthem = "*"; + } else { + versionSep = ""; + nextPos = artifact.length(); + } + } + + String artifactId = artifact.substring(pos + 2, nextPos); + if (nextPos < artifact.length()) { + if (!restOfthem.equals("*")) { + restOfthem = artifact.substring(nextPos + 1); + } + } + + String [] version = scala.util.Properties.versionNumberString().split("[.]"); + String scalaVersion = version[0] + "." + version[1]; + + return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem; + } else { + return artifact; + } + } } diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java new file mode 100644 index 00000000..8ca5fe7f --- /dev/null +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java @@ -0,0 +1,37 @@ +package com.nflabs.zeppelin.spark.dep; + +/** + * + * + */ +public class Repository { + private boolean snapshot = false; + private String name; + private String url; + + public Repository(String name){ + this.name = name; + } + + public Repository url(String url) { + this.url = url; + return this; + } + + public Repository snapshot() { + snapshot = true; + return this; + } + + public boolean isSnapshot() { + return snapshot; + } + + public String getName() { + return name; + } + + public String getUrl() { + return url; + } +} diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java index e24d2c14..cf48a331 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java @@ -2,6 +2,7 @@ import org.apache.maven.repository.internal.DefaultServiceLocator; import org.apache.maven.wagon.Wagon; +import org.apache.maven.wagon.providers.http.HttpWagon; import org.apache.maven.wagon.providers.http.LightweightHttpWagon; import org.sonatype.aether.RepositorySystem; import org.sonatype.aether.connector.file.FileRepositoryConnectorFactory; @@ -11,7 +12,7 @@ /** * Get maven repository instance. - * + * * @author anthonycorbacho * */ @@ -26,14 +27,20 @@ public static RepositorySystem newRepositorySystem() { } /** - * ManualWagonProvider + * ManualWagonProvider */ public static class ManualWagonProvider implements WagonProvider { + @Override public Wagon lookup(String roleHint) throws Exception { if ("http".equals(roleHint)) { return new LightweightHttpWagon(); } + + if ("https".equals(roleHint)) { + return new HttpWagon(); + } + return null; } diff --git a/spark/src/test/java/com/nflabs/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/com/nflabs/zeppelin/spark/DepInterpreterTest.java new file mode 100644 index 00000000..aecb8c3b --- /dev/null +++ b/spark/src/test/java/com/nflabs/zeppelin/spark/DepInterpreterTest.java @@ -0,0 +1,72 @@ +package com.nflabs.zeppelin.spark; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.nflabs.zeppelin.interpreter.InterpreterContext; +import com.nflabs.zeppelin.interpreter.InterpreterGroup; +import com.nflabs.zeppelin.interpreter.InterpreterResult; +import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; +import com.nflabs.zeppelin.notebook.Paragraph; + +public class DepInterpreterTest { + private DepInterpreter dep; + private InterpreterContext context; + private File tmpDir; + private SparkInterpreter repl; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + + tmpDir.mkdirs(); + + Properties p = new Properties(); + + dep = new DepInterpreter(p); + dep.open(); + + InterpreterGroup intpGroup = new InterpreterGroup(); + intpGroup.add(new SparkInterpreter(p)); + intpGroup.add(dep); + dep.setInterpreterGroup(intpGroup); + + context = new InterpreterContext(new Paragraph(null, null)); + } + + @After + public void tearDown() throws Exception { + dep.close(); + delete(tmpDir); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testDefault() { + dep.getDependencyContext().reset(); + InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context); + assertEquals(Code.SUCCESS, ret.code()); + + assertEquals(1, dep.getDependencyContext().getFiles().size()); + assertEquals(1, dep.getDependencyContext().getFilesDist().size()); + } +} diff --git a/spark/src/test/java/com/nflabs/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/com/nflabs/zeppelin/spark/SparkInterpreterTest.java index b89c56a6..a7e40a59 100644 --- a/spark/src/test/java/com/nflabs/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/com/nflabs/zeppelin/spark/SparkInterpreterTest.java @@ -3,24 +3,33 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.File; import java.util.Properties; import org.junit.After; import org.junit.Before; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import com.nflabs.zeppelin.interpreter.InterpreterContext; import com.nflabs.zeppelin.interpreter.InterpreterResult; import com.nflabs.zeppelin.interpreter.InterpreterResult.Code; import com.nflabs.zeppelin.notebook.Paragraph; - +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SparkInterpreterTest { - public static SparkInterpreter repl; + public static SparkInterpreter repl; private InterpreterContext context; + private File tmpDir; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + + tmpDir.mkdirs(); - @Before - public void setUp() throws Exception { if (repl == null) { Properties p = new Properties(); @@ -31,9 +40,23 @@ public void setUp() throws Exception { context = new InterpreterContext(new Paragraph(null, null)); } - @After - public void tearDown() throws Exception { - } + @After + public void tearDown() throws Exception { + delete(tmpDir); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } @Test public void testBasicIntp() { @@ -59,8 +82,8 @@ public void testEndWithComment() { @Test public void testSparkSql(){ - repl.interpret("case class Person(name:String, age:Int)", context); - repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))", context); + repl.interpret("case class Person(name:String, age:Int)\n", context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); // create new interpreter @@ -80,6 +103,15 @@ public void testReferencingUndefinedVal(){ " if (0 <= value) \"error\"" + "}", context); assertEquals(Code.ERROR, result.code()); - System.out.println("msg="+result.message()); } + + @Test + public void testZContextDependencyLoading() { + // try to import library does not exist on classpath. it'll fail + assertEquals(InterpreterResult.Code.ERROR, repl.interpret("import org.apache.commons.csv.CSVFormat", context).code()); + + // load library from maven repository and try to import again + repl.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context); + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("import org.apache.commons.csv.CSVFormat", context).code()); + } } diff --git a/spark/src/test/java/com/nflabs/zeppelin/spark/dep/DependencyResolverTest.java b/spark/src/test/java/com/nflabs/zeppelin/spark/dep/DependencyResolverTest.java new file mode 100644 index 00000000..804c31eb --- /dev/null +++ b/spark/src/test/java/com/nflabs/zeppelin/spark/dep/DependencyResolverTest.java @@ -0,0 +1,34 @@ +package com.nflabs.zeppelin.spark.dep; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class DependencyResolverTest { + + @Test + public void testInferScalaVersion() { + String [] version = scala.util.Properties.versionNumberString().split("[.]"); + String scalaVersion = version[0] + "." + version[1]; + + assertEquals("groupId:artifactId:version", + DependencyResolver.inferScalaVersion("groupId:artifactId:version")); + assertEquals("groupId:artifactId_" + scalaVersion + ":version", + DependencyResolver.inferScalaVersion("groupId::artifactId:version")); + assertEquals("groupId:artifactId:version::test", + DependencyResolver.inferScalaVersion("groupId:artifactId:version::test")); + assertEquals("*", + DependencyResolver.inferScalaVersion("*")); + assertEquals("groupId:*", + DependencyResolver.inferScalaVersion("groupId:*")); + assertEquals("groupId:artifactId*", + DependencyResolver.inferScalaVersion("groupId:artifactId*")); + assertEquals("groupId:artifactId_" + scalaVersion, + DependencyResolver.inferScalaVersion("groupId::artifactId")); + assertEquals("groupId:artifactId_" + scalaVersion + "*", + DependencyResolver.inferScalaVersion("groupId::artifactId*")); + assertEquals("groupId:artifactId_" + scalaVersion + ":*", + DependencyResolver.inferScalaVersion("groupId::artifactId:*")); + } + +} diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index e489511b..b0b3e1f5 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -82,14 +82,6 @@ com.fasterxml.jackson.module jackson-module-scala_2.10 - - org.json4s - json4s-jackson_2.10 - - - org.json4s - json4s-core_2.10 - com.thoughtworks.paranamer paranamer diff --git a/zeppelin-server/src/test/java/com/nflabs/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/com/nflabs/zeppelin/rest/ZeppelinRestApiTest.java index 2ff0ab03..cb9a8504 100644 --- a/zeppelin-server/src/test/java/com/nflabs/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/com/nflabs/zeppelin/rest/ZeppelinRestApiTest.java @@ -62,7 +62,7 @@ public void getAvailableInterpreters() throws IOException { assertThat(get, isAllowed()); Map resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken>(){}.getType()); Map body = (Map) resp.get("body"); - assertEquals(4, body.size()); + assertEquals(5, body.size()); get.releaseConnection(); } diff --git a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/conf/ZeppelinConfiguration.java index 8cade790..cb7f1d65 100644 --- a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/conf/ZeppelinConfiguration.java @@ -351,6 +351,7 @@ public static enum ConfVars { ZEPPELIN_API_WAR("zeppelin.api.war", "../zeppelin-docs/src/main/swagger"), ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "com.nflabs.zeppelin.spark.SparkInterpreter," + "com.nflabs.zeppelin.spark.SparkSqlInterpreter," + + "com.nflabs.zeppelin.spark.DepInterpreter," + "com.nflabs.zeppelin.markdown.Markdown," + "com.nflabs.zeppelin.shell.ShellInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),