diff --git a/README.md b/README.md index 85fc0b6a1c2..9a99b7881e5 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,11 @@ mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests mvn clean package -Dignite.version=1.1.0-incubating -DskipTests ``` +#### Scalding Interpreter + +``` +mvn clean package -Pscalding -DskipTests +``` ### Configure If you wish to configure Zeppelin option (like port number), configure the following files: diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index b6aca75d626..74fa2e76053 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -105,7 +105,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 2c6228275d7..5b2da51abc1 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -45,6 +45,7 @@
  • Lens
  • Markdown
  • Postgresql, hawq
  • +
  • Scalding
  • Shell
  • Spark
  • Tajo
  • diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png new file mode 100644 index 00000000000..113131031ee Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png new file mode 100644 index 00000000000..c52f4e3d87e Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png new file mode 100644 index 00000000000..bb010257ecb Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png differ diff --git a/docs/docs.md b/docs/docs.md index a2347a8ad36..b70ee58e116 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -41,6 +41,7 @@ limitations under the License. * [lens](./interpreter/lens.html) * [md](./interpreter/markdown.html) * [postgresql, hawq](./interpreter/postgresql.html) +* [scalding](./interpreter/scalding.html) * [sh](./pleasecontribute.html) * [spark](./interpreter/spark.html) * [tajo](./pleasecontribute.html) diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md new file mode 100644 index 00000000000..40ec8b1211e --- /dev/null +++ b/docs/interpreter/scalding.md @@ -0,0 +1,78 @@ +--- +layout: page +title: "Scalding Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + + +## Scalding Interpreter for Apache Zeppelin +[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs. + +### Building the Scalding Interpreter +You have to first build the Scalding interpreter by enable the **scalding** profile as follows: + +``` +mvn clean package -Pscalding -DskipTests +``` + +### Enabling the Scalding Interpreter + +In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**. + +
    + ![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png) + + ![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png) +
    + +### Configuring the Interpreter +Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything. + +### Testing the Interpreter + +In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book. + +``` +%scalding + +import scala.io.Source + +// Get the Alice in Wonderland book from gutenberg.org: +val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines +val aliceLineNum = alice.zipWithIndex.toList +val alicePipe = TypedPipe.from(aliceLineNum) + +// Now get a list of words for the book: +val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList } + +// Now lets add a count for each word: +val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) } + +// let's sum them for each word: +val wordCount = aliceWithCount.group.sum + +print ("Here are the top 10 words\n") +val top10 = wordCount + .groupAll + .sortBy { case (word, count) => -count } + .take(10) +top10.dump + +``` +``` +%scalding + +val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n") +print("%table " + table) + +``` + +If you click on the icon for the pie chart, you should be able to see a chart like this: +![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png) + +### Current Status & Future Work +The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. + +The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!). \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5e492fab3ae..88d38aa85f6 100755 --- a/pom.xml +++ b/pom.xml @@ -627,6 +627,13 @@ + + scalding + + scalding + + + build-distr diff --git a/scalding/pom.xml b/scalding/pom.xml new file mode 100644 index 00000000000..abc1e2b88ba --- /dev/null +++ b/scalding/pom.xml @@ -0,0 +1,202 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + .. + + + org.apache.zeppelin + zeppelin-scalding + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: Scalding interpreter + http://zeppelin.incubator.apache.org + + + 2.10.4 + 2.3.0 + 0.15.1-RC13 + + + + + conjars + Concurrent Maven Repo + http://conjars.org/repo + + + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.3 + + + + junit + junit + test + + + + com.twitter + scalding-core_2.10 + ${scalding.version} + + + + com.twitter + scalding-repl_2.10 + ${scalding.version} + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/scalding + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/scalding + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + org.scala-tools + maven-scala-plugin + + + compile + + compile + + compile + + + test-compile + + testCompile + + test-compile + + + process-resources + + compile + + + + + + + + diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java new file mode 100644 index 00000000000..d43417e0031 --- /dev/null +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.net.URL; +import java.net.URLClassLoader; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Console; +import scala.Some; +import scala.None; +import scala.tools.nsc.Settings; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +/** + * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. + * + */ +public class ScaldingInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); + + public static final List NO_COMPLETION = + Collections.unmodifiableList(new ArrayList()); + + static { + Interpreter.register("scalding", ScaldingInterpreter.class.getName()); + } + + private ScaldingILoop interpreter; + private ByteArrayOutputStream out; + private Map binder; + + public ScaldingInterpreter(Properties property) { + super(property); + out = new ByteArrayOutputStream(); + } + + @Override + public void open() { + URL[] urls = getClassloaderUrls(); + + // Very nice discussion about how scala compiler handle classpath + // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0 + + /* + * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new + * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through + * nsc.Settings.classpath. + * + * >> val settings = new Settings() >> settings.usejavacp.value = true >> + * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >> + * val in = new Interpreter(settings) { >> override protected def parentClassLoader = + * getClass.getClassLoader >> } >> in.setContextClassLoader() + */ + Settings settings = new Settings(); + + // set classpath for scala compiler + PathSetting pathSettings = settings.classpath(); + String classpath = ""; + List paths = currentClassPath(); + for (File f : paths) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + + if (urls != null) { + for (URL u : urls) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += u.getFile(); + } + } + + pathSettings.v_$eq(classpath); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + + + // set classloader for scala compiler + settings.explicitParentLoader_$eq(new Some(Thread.currentThread() + .getContextClassLoader())); + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + /* Scalding interpreter */ + PrintStream printStream = new PrintStream(out); + interpreter = new ScaldingILoop(null, new PrintWriter(out)); + interpreter.settings_$eq(settings); + interpreter.createInterpreter(); + + interpreter.intp(). + interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map) getValue("_binder"); + binder.put("out", printStream); + } + + private Object getValue(String name) { + Object ret = interpreter.intp().valueOfTerm(name); + if (ret instanceof None) { + return null; + } else if (ret instanceof Some) { + return ((Some) ret).get(); + } else { + return ret; + } + } + + private List currentClassPath() { + List paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + if (cps != null) { + for (String cp : cps) { + paths.add(new File(cp)); + } + } + return paths; + } + + private List classPath(ClassLoader cl) { + List paths = new LinkedList(); + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + return paths; + } + + @Override + public void close() { + interpreter.intp().close(); + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Running Scalding command '" + cmd + "'"); + + if (cmd == null || cmd.trim().length() == 0) { + return new InterpreterResult(Code.SUCCESS); + } + return interpret(cmd.split("\n"), contextInterpreter); + } + + public InterpreterResult interpret(String[] lines, InterpreterContext context) { + synchronized (this) { + InterpreterResult r = interpretInput(lines); + return r; + } + } + + public InterpreterResult interpretInput(String[] lines) { + + // add print("") to make sure not finishing with comment + // see https://github.com/NFLabs/zeppelin/issues/151 + String[] linesToRun = new String[lines.length + 1]; + for (int i = 0; i < lines.length; i++) { + linesToRun[i] = lines[i]; + } + linesToRun[lines.length] = "print(\"\")"; + + Console.setOut((java.io.PrintStream) binder.get("out")); + out.reset(); + Code r = null; + String incomplete = ""; + + for (int l = 0; l < linesToRun.length; l++) { + String s = linesToRun[l]; + // check if next line starts with "." (but not ".." or "./") it is treated as an invocation + if (l + 1 < linesToRun.length) { + String nextLine = linesToRun[l + 1].trim(); + if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) { + incomplete += s + "\n"; + continue; + } + } + + scala.tools.nsc.interpreter.Results.Result res = null; + try { + res = interpreter.intp().interpret(incomplete + s); + } catch (Exception e) { + logger.error("Interpreter exception: ", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + + r = getResultCode(res); + + if (r == Code.ERROR) { + Console.flush(); + return new InterpreterResult(r, out.toString()); + } else if (r == Code.INCOMPLETE) { + incomplete += s + "\n"; + } else { + incomplete = ""; + } + } + + if (r == Code.INCOMPLETE) { + return new InterpreterResult(r, "Incomplete expression"); + } else { + Console.flush(); + return new InterpreterResult(r, out.toString()); + } + } + + private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { + if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + @Override + public void cancel(InterpreterContext context) { + // not implemented + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + // fine-grained progress not implemented - return 0 + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + ScaldingInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return NO_COMPLETION; + } +} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala new file mode 100644 index 00000000000..bd23c4937f5 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import java.io.{BufferedReader, File, FileReader} + +import scala.tools.nsc.GenericRunnerSettings +import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter} + + +/** + * A class providing Scalding specific commands for inclusion in the Scalding REPL. + * This is currently forked from Scalding, but should eventually make it into Scalding itself: + * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala + */ + class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter) + extends ILoop(in0, out) { + // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) + // def this() = this(None, new JPrintWriter(Console.out, true)) + + settings = new GenericRunnerSettings({ s => echo(s) }) + + override def printWelcome() { + val fc = Console.YELLOW + val wc = Console.RED + def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc) + echo(fc + + " ( \n" + + " )\\ ) ( ( \n" + + "(()/( ) )\\ )\\ ) ( ( ( \n" + + " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" + + "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc + + wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") + + wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") + + "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" + + " |___/ ") + } + + /** + * Commands specific to the Scalding REPL. To define a new command use one of the following + * factory methods: + * - `LoopCommand.nullary` for commands that take no arguments + * - `LoopCommand.cmd` for commands that take one string argument + * - `LoopCommand.varargs` for commands that take multiple string arguments + */ + private val scaldingCommands: List[LoopCommand] = List() + + /** + * Change the shell prompt to read scalding> + * + * @return a prompt string to use for this REPL. + */ + override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET + + private[this] def addImports(ids: String*): IR.Result = + if (ids.isEmpty) IR.Success + else intp.interpret("import " + ids.mkString(", ")) + + /** + * Search for files with the given name in all directories from current directory + * up to root. + */ + private def findAllUpPath(filename: String): List[File] = + Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent) + .takeWhile(_ != "/") + .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename))) + .toList + + /** + * Gets the list of commands that this REPL supports. + * + * @return a list of the command supported by this REPL. + */ + override def commands: List[LoopCommand] = super.commands ++ scaldingCommands + + protected def imports: List[String] = List( + "com.twitter.scalding._", + "com.twitter.scalding.ReplImplicits._", + "com.twitter.scalding.ReplImplicitContext._", + "com.twitter.scalding.ReplState._") + + override def createInterpreter() { + super.createInterpreter() + intp.beQuietDuring { + addImports(imports: _*) + + settings match { + case s: GenericRunnerSettings => + findAllUpPath(".scalding_repl").reverse.foreach { + f => s.loadfiles.appendToValue(f.toString) + } + case _ => () + } + } + } +} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java new file mode 100644 index 00000000000..7a753fa7cd5 --- /dev/null +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +/** + * Tests for the Scalding interpreter for Zeppelin. + * + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ScaldingInterpreterTest { + public static ScaldingInterpreter repl; + private InterpreterContext context; + private File tmpDir; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + + tmpDir.mkdirs(); + + if (repl == null) { + Properties p = new Properties(); + + repl = new ScaldingInterpreter(p); + repl.open(); + } + + InterpreterGroup intpGroup = new InterpreterGroup(); + context = new InterpreterContext("note", "id", "title", "text", + new HashMap(), new GUI(), new AngularObjectRegistry( + intpGroup.getId(), null), + new LinkedList()); + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + repl.close(); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testBasicIntp() { + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("val a = 1\nval b = 2", context).code()); + + // when interpret incomplete expression + InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); + assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); + assertTrue(incomplete.message().length() > 0); // expecting some error + // message + } + + @Test + public void testBasicScalding() { + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" + + "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), Sale(\"VA\", \"B\", 15))\n" + + "val salesPipe = TypedPipe.from(salesList)\n" + + "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + + " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + + "results.dump", + context).code()); + } + + @Test + public void testNextLineInvocation() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); + } + + @Test + public void testEndWithComment() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); + } + + @Test + public void testReferencingUndefinedVal() { + InterpreterResult result = repl.interpret("def category(min: Int) = {" + + " if (0 <= value) \"error\"" + "}", context); + assertEquals(Code.ERROR, result.code()); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index a4d23e93ef5..885b1cc525d 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -414,7 +414,8 @@ public static enum ConfVars { + "org.apache.zeppelin.geode.GeodeOqlInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + "org.apache.zeppelin.kylin.KylinInterpreter," - + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"), + + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter," + + "org.apache.zeppelin.scalding.ScaldingInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),