Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0892d6a
Add local-repo in gitignore
Leemoonsoo Jan 28, 2015
a52577f
Fix problem of loading jar in runtime/compiler classpath.
Leemoonsoo Jan 28, 2015
2686400
Handle exclusion correctly
Leemoonsoo Jan 28, 2015
748e251
Ability to add maven repository
Leemoonsoo Jan 28, 2015
bc062c6
static import JavaConversions
Leemoonsoo Jan 28, 2015
277034b
Add javadoc comment
Leemoonsoo Jan 28, 2015
737ed5a
Add basic test for dependency library loading
Leemoonsoo Jan 28, 2015
788bb20
Tab to space
Leemoonsoo Jan 28, 2015
0025628
Fix test order
Leemoonsoo Jan 28, 2015
4ef1b96
Return loaded dependency list
Leemoonsoo Jan 29, 2015
dc4995e
Add scala-reflect and scalap to default exclusion
Leemoonsoo Jan 29, 2015
3f55fa0
DepInterpreter implementation
Leemoonsoo Feb 5, 2015
cfa7089
Print error message when DepInterpreter used after SparkInterpreter i…
Leemoonsoo Feb 5, 2015
2338ae6
Fix test
Leemoonsoo Feb 5, 2015
2d1d39b
Add local m2 repo by default
Leemoonsoo Feb 5, 2015
c04505d
Fix test
Leemoonsoo Feb 5, 2015
ea2a76f
extension and classifier
Leemoonsoo Feb 7, 2015
e9ff658
Add com.nflabs.zeppelin.spark.DepInterpreter to zeppelin-site.xml.tem…
Leemoonsoo Feb 7, 2015
55a40c6
Fix wrong api method call
Leemoonsoo Jan 28, 2015
a56f6d2
* Infer scala version after artifactId by doing '::'
Leemoonsoo Feb 8, 2015
6d9b5b4
Take care of json4s version
Leemoonsoo Feb 10, 2015
b2b7577
Make dist() default
Leemoonsoo Feb 10, 2015
ab20344
Merge pull request #319 from NFLabs/new/depinterpreter
Leemoonsoo Feb 12, 2015
ad24972
Merge branch 'master' into improve/libload
Leemoonsoo Feb 12, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ zan-repo/
drivers/
warehouse/
notebook/
local-repo/

**/sessions/
**/data/
Expand Down
2 changes: 1 addition & 1 deletion conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

<property>
<name>zeppelin.interpreters</name>
<value>com.nflabs.zeppelin.spark.SparkInterpreter,com.nflabs.zeppelin.spark.SparkSqlInterpreter,com.nflabs.zeppelin.markdown.Markdown,com.nflabs.zeppelin.shell.ShellInterpreter</value>
<value>com.nflabs.zeppelin.spark.SparkInterpreter,com.nflabs.zeppelin.spark.SparkSqlInterpreter,com.nflabs.zeppelin.spark.DepInterpreter,com.nflabs.zeppelin.markdown.Markdown,com.nflabs.zeppelin.shell.ShellInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

Expand Down
27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
<json4s.version>3.2.10</json4s.version>
<chill.version>0.3.6</chill.version>
<codahale.metrics.version>3.0.0</codahale.metrics.version>
<avro.version>1.7.6</avro.version>
Expand Down Expand Up @@ -794,6 +795,32 @@
<version>2.2.6</version>
</dependency>


<!-- json4s -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-core_2.10</artifactId>
<version>${json4s.version}</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>${json4s.version}</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.10</artifactId>
<version>${json4s.version}</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ext_2.10</artifactId>
<version>${json4s.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
8 changes: 8 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http</artifactId>
<version>1.0</version>
<exclusions>
</exclusions>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
277 changes: 277 additions & 0 deletions spark/src/main/java/com/nflabs/zeppelin/spark/DepInterpreter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package com.nflabs.zeppelin.spark;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.repl.SparkILoop;
import org.apache.spark.repl.SparkIMain;
import org.apache.spark.repl.SparkJLineCompletion;
import org.sonatype.aether.resolution.ArtifactResolutionException;
import org.sonatype.aether.resolution.DependencyResolutionException;

import scala.Console;
import scala.None;
import scala.Some;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.Completion.Candidates;
import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;

import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterContext;
import com.nflabs.zeppelin.interpreter.InterpreterGroup;
import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
import com.nflabs.zeppelin.interpreter.InterpreterResult;
import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
import com.nflabs.zeppelin.scheduler.Scheduler;
import com.nflabs.zeppelin.spark.dep.DependencyContext;


/**
* DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized.
* It extends SparkInterpreter but does not create sparkcontext
*
*/
public class DepInterpreter extends Interpreter {

static {
Interpreter.register(
"dep",
"spark",
DepInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.build());

}

private SparkIMain intp;
private ByteArrayOutputStream out;
private DependencyContext depc;
private SparkJLineCompletion completor;
private SparkILoop interpreter;

public DepInterpreter(Properties property) {
super(property);
}

public DependencyContext getDependencyContext() {
return depc;
}


@Override
public void close() {
if (intp != null) {
intp.close();
}
}

@Override
public void open() {
out = new ByteArrayOutputStream();
createIMain();
}


private void createIMain() {
Settings settings = new Settings();
URL[] urls = getClassloaderUrls();

// set classpath for scala compiler
PathSetting pathSettings = settings.classpath();
String classpath = "";
List<File> paths = currentClassPath();
for (File f : paths) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += f.getAbsolutePath();
}

if (urls != null) {
for (URL u : urls) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += u.getFile();
}
}

pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);

// set classloader for scala compiler
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
.getContextClassLoader()));

BooleanSetting b = (BooleanSetting) settings.usejavacp();
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);

interpreter = new SparkILoop(null, new PrintWriter(out));
interpreter.settings_$eq(settings);

interpreter.createInterpreter();


intp = interpreter.intp();
intp.setContextClassLoader();
intp.initializeSynchronous();

depc = new DependencyContext();
completor = new SparkJLineCompletion(intp);

intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
binder.put("depc", depc);

intp.interpret("@transient val z = "
+ "_binder.get(\"depc\").asInstanceOf[com.nflabs.zeppelin.spark.dep.DependencyContext]");

}

@Override
public Object getValue(String name) {
Object ret = intp.valueOfTerm(name);
if (ret instanceof None) {
return null;
} else if (ret instanceof Some) {
return ((Some) ret).get();
} else {
return ret;
}
}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
PrintStream printStream = new PrintStream(out);
Console.setOut(printStream);
out.reset();

SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (sparkInterpreter == null) {
return new InterpreterResult(Code.ERROR,
"Must be used with SparkInterpreter");
}
if (sparkInterpreter.isSparkContextInitialized()) {
return new InterpreterResult(Code.ERROR,
"Must be used before SparkInterpreter (%spark) initialized");
}

scala.tools.nsc.interpreter.Results.Result ret = intp.interpret(st);
Code code = getResultCode(ret);

try {
depc.fetch();
} catch (MalformedURLException | DependencyResolutionException
| ArtifactResolutionException e) {
return new InterpreterResult(Code.ERROR, e.toString());
}

if (code == Code.INCOMPLETE) {
return new InterpreterResult(code, "Incomplete expression");
} else if (code == Code.ERROR) {
return new InterpreterResult(code, out.toString());
} else {
return new InterpreterResult(code, out.toString());
}
}

private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
return Code.SUCCESS;
} else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
return Code.INCOMPLETE;
} else {
return Code.ERROR;
}
}

@Override
public void cancel(InterpreterContext context) {
}

@Override
public void bindValue(String name, Object o) {
}

@Override
public FormType getFormType() {
return null;
}

@Override
public int getProgress(InterpreterContext context) {
return 0;
}

@Override
public List<String> completion(String buf, int cursor) {
ScalaCompleter c = completor.completer();
Candidates ret = c.complete(buf, cursor);
return scala.collection.JavaConversions.asJavaList(ret.candidates());
}

private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
if (cps != null) {
for (String cp : cps) {
paths.add(new File(cp));
}
}
return paths;
}

private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
if (cl == null) {
return paths;
}

if (cl instanceof URLClassLoader) {
URLClassLoader ucl = (URLClassLoader) cl;
URL[] urls = ucl.getURLs();
if (urls != null) {
for (URL url : urls) {
paths.add(new File(url.getFile()));
}
}
}
return paths;
}

private SparkInterpreter getSparkInterpreter() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) {
return null;
}
for (Interpreter intp : intpGroup){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (SparkInterpreter) p;
}
}
return null;
}

@Override
public Scheduler getScheduler() {
return getSparkInterpreter().getScheduler();
}

}
Loading