From 5908935f7571c63fe2ac49ade7f7830c0220e7b0 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Wed, 25 May 2016 10:52:46 -0700 Subject: [PATCH 01/15] Scalding interpreter that works in hdfs mode --- pom.xml | 1 + scalding/pom.xml | 42 ++++- .../scalding/ScaldingInterpreter.java | 140 ++++------------ .../twitter/scalding/ZeppelinReplState.scala | 100 +++++++++++ .../scalding/ZeppelinScaldingLoop.scala | 44 +++++ .../scalding/ZeppelinScaldingShell.scala | 155 ++++++++++++++++++ .../zeppelin/scalding/ScaldingILoop.scala | 111 ------------- .../scalding/ScaldingInterpreterTest.java | 3 +- 8 files changed, 369 insertions(+), 227 deletions(-) create mode 100644 scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala create mode 100644 scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala create mode 100644 scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala delete mode 100644 scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala diff --git a/pom.xml b/pom.xml index 8799ff687a8..736026c6ee8 100755 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ cassandra elasticsearch alluxio + scalding zeppelin-web zeppelin-server zeppelin-distribution diff --git a/scalding/pom.xml b/scalding/pom.xml index 2b04f666623..71b0eb88f20 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -34,9 +34,9 @@ http://zeppelin.apache.org - 2.10.4 - 2.3.0 - 0.15.1-RC13 + 2.11.8 + 2.6.0 + 0.16.0 @@ -69,13 +69,43 @@ com.twitter - scalding-core_2.10 + scalding-core_2.11 ${scalding.version} com.twitter - scalding-repl_2.10 + scalding-args_2.11 + ${scalding.version} + + + + com.twitter + scalding-date_2.11 + ${scalding.version} + + + + com.twitter + scalding-commons_2.11 + ${scalding.version} + + + + com.twitter + scalding-avro_2.11 + ${scalding.version} + + + + com.twitter + scalding-parquet_2.11 + ${scalding.version} + + + + com.twitter + scalding-repl_2.11 ${scalding.version} @@ -100,7 +130,7 @@ org.apache.hadoop - hadoop-common + hadoop-client ${hadoop.version} diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index e808e702c74..56fd6246441 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -17,21 +17,16 @@ package org.apache.zeppelin.scalding; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.io.*; +import java.util.*; import java.net.URL; import java.net.URLClassLoader; +import com.twitter.scalding.ScaldingILoop; +import org.apache.commons.io.output.WriterOutputStream; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.scheduler.Scheduler; @@ -44,6 +39,7 @@ import scala.Some; import scala.None; import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.IMain; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -54,16 +50,22 @@ public class ScaldingInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); + static final String ARGS_STRING = "args.string"; + public static final List NO_COMPLETION = Collections.unmodifiableList(new ArrayList()); static { - Interpreter.register("scalding", ScaldingInterpreter.class.getName()); + Interpreter.register( + "scalding", + "scalding", + ScaldingInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(ARGS_STRING, "--hdfs --repl", "Arguments for scalding REPL").build()); } private ScaldingILoop interpreter; private ByteArrayOutputStream out; - private Map binder; public ScaldingInterpreter(Properties property) { super(property); @@ -72,104 +74,19 @@ public ScaldingInterpreter(Properties property) { @Override public void open() { - URL[] urls = getClassloaderUrls(); - - // Very nice discussion about how scala compiler handle classpath - // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0 - - /* - * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new - * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through - * nsc.Settings.classpath. - * - * >> val settings = new Settings() >> settings.usejavacp.value = true >> - * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >> - * val in = new Interpreter(settings) { >> override protected def parentClassLoader = - * getClass.getClassLoader >> } >> in.setContextClassLoader() - */ - Settings settings = new Settings(); - - // set classpath for scala compiler - PathSetting pathSettings = settings.classpath(); - String classpath = ""; - List paths = currentClassPath(); - for (File f : paths) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += f.getAbsolutePath(); - } - - if (urls != null) { - for (URL u : urls) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += u.getFile(); - } - } - - pathSettings.v_$eq(classpath); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - - - // set classloader for scala compiler - settings.explicitParentLoader_$eq(new Some(Thread.currentThread() - .getContextClassLoader())); - BooleanSetting b = (BooleanSetting) settings.usejavacp(); - b.v_$eq(true); - settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - /* Scalding interpreter */ - PrintStream printStream = new PrintStream(out); - interpreter = new ScaldingILoop(null, new PrintWriter(out)); - interpreter.settings_$eq(settings); - interpreter.createInterpreter(); - - interpreter.intp(). - interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); - binder.put("out", printStream); - } - - private Object getValue(String name) { - Object ret = interpreter.intp().valueOfTerm(name); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); + logger.info("property: {}", property); + String argsString = property.getProperty(ARGS_STRING); + String[] args; + if (argsString == null) { + args = new String[0]; } else { - return ret; - } - } - - private List currentClassPath() { - List paths = classPath(Thread.currentThread().getContextClassLoader()); - String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); - if (cps != null) { - for (String cp : cps) { - paths.add(new File(cp)); - } - } - return paths; - } - - private List classPath(ClassLoader cl) { - List paths = new LinkedList(); - if (cl == null) { - return paths; + args = argsString.split(" "); } + logger.info("{}", Arrays.toString(args)); - if (cl instanceof URLClassLoader) { - URLClassLoader ucl = (URLClassLoader) cl; - URL[] urls = ucl.getURLs(); - if (urls != null) { - for (URL url : urls) { - paths.add(new File(url.getFile())); - } - } - } - return paths; + PrintWriter printWriter = new PrintWriter(out, true); + interpreter = com.twitter.scalding.ZeppelinScaldingShell.getRepl(args, printWriter); + interpreter.createInterpreter(); } @Override @@ -205,8 +122,13 @@ public InterpreterResult interpretInput(String[] lines) { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut((java.io.PrintStream) binder.get("out")); out.reset(); + + // Moving two lines below from open() to this function. + // If they are in open output is incomplete. + PrintStream printStream = new PrintStream(out, true); + Console.setOut(printStream); + Code r = null; String incomplete = ""; boolean inComment = false; @@ -261,7 +183,6 @@ public InterpreterResult interpretInput(String[] lines) { incomplete = ""; } } - if (r == Code.INCOMPLETE) { return new InterpreterResult(r, "Incomplete expression"); } else { @@ -306,4 +227,5 @@ public Scheduler getScheduler() { public List completion(String buf, int cursor) { return NO_COMPLETION; } + } diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala new file mode 100644 index 00000000000..2d10b0c8933 --- /dev/null +++ b/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.scalding + +/** + * Stores REPL state + */ + +import cascading.flow.FlowDef +import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } +import scala.concurrent.Future +import scala.util.{Failure, Success} + +object ZeppelinReplState extends BaseReplState { + override def shell = ZeppelinScaldingShell + + /** Create config for execution. Tacks on a new jar for each execution. + * We have to use executionConfig1 and override run, asyncExecute, execute since we need + * to use ZeppelinScaldingShell.createReplCodeJar1. See comment for that method. + */ + def executionConfig1: Config = { + // Create a jar to hold compiled code for this REPL session in addition to + // "tempjars" which can be passed in from the command line, allowing code + // in the repl to be distributed for the Hadoop job to run. + val replCodeJar: Option[java.io.File] = shell.createReplCodeJar1() + val tmpJarsConfig: Map[String, String] = + replCodeJar match { + case Some(jar) => + Map("tmpjars" -> { + // Use tmpjars already in the configuration. + config.get("tmpjars").map(_ + ",").getOrElse("") + // And a jar of code compiled by the REPL. + .concat("file://" + jar.getAbsolutePath) + }) + case None => + // No need to add the tmpjars to the configuration + Map() + } + config ++ tmpJarsConfig + } + + /** + * Runs this pipe as a Scalding job. + * + * Automatically cleans up the flowDef to include only sources upstream from tails. + */ + override def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = + ExecutionContext.newContext(executionConfig1)(fd, md).waitFor match { + case Success(stats) => Some(stats) + case Failure(e) => + println("Flow execution failed!") + e.printStackTrace() + None + } + + /* + * Starts the Execution, but does not wait for the result + */ + override def asyncExecute[T](execution: Execution[T])(implicit ec: ConcurrentExecutionContext): Future[T] = + execution.run(executionConfig1, mode) + + /* + * This runs the Execution[T] and waits for the result + */ + override def execute[T](execution: Execution[T]): T = + execution.waitFor(executionConfig1, mode).get + + +} + +/** + * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly + * used everywhere. + */ +object ZeppelinReplImplicitContext { + /** Implicit execution context for using the Execution monad */ + implicit val executionContext = ConcurrentExecutionContext.global + /** Implicit repl state used for ShellPipes */ + implicit def stateImpl = ZeppelinReplState + /** Implicit flowDef for this Scalding shell session. */ + implicit def flowDefImpl = ZeppelinReplState.flowDef + /** Defaults to running in local mode if no mode is specified. */ + implicit def modeImpl = ZeppelinReplState.mode + implicit def configImpl = ZeppelinReplState.config +} diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala new file mode 100644 index 00000000000..a8c5608cb22 --- /dev/null +++ b/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.scalding + +import java.io.BufferedReader +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) + extends ScaldingILoop(in, out) { + + override protected def imports = List( + "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }", + // ReplImplicits minus fields API parts (esp FieldConversions) + """com.twitter.scalding.ReplImplicits.{ + iterableToSource, + keyedListLikeToShellTypedPipe, + typedPipeToShellTypedPipe, + valuePipeToShellValuePipe + }""", + "com.twitter.scalding.ReplImplicits", + "com.twitter.scalding.ZeppelinReplImplicitContext._", + "com.twitter.scalding.ZeppelinReplState", + "com.twitter.scalding.ZeppelinReplState._" + ) + +} diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala new file mode 100644 index 00000000000..cb6cf360aab --- /dev/null +++ b/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.scalding + + +import java.io.{FileOutputStream, File} +import java.util.jar.{JarEntry, JarOutputStream} +import com.google.common.io.Files +import scala.tools.nsc.io._ +import scala.tools.nsc.{GenericRunnerSettings, GenericRunnerCommand} +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +object ZeppelinScaldingShell extends BaseScaldingShell { + + override def replState = ZeppelinReplState + + /** + * An instance of the Scala REPL the user will interact with. + */ + var zeppelinScaldingREPL: Option[ILoop] = None + + def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { + + val argsExpanded = ExpandLibJarsGlobs(args) + val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) + + // Process command line arguments into a settings object, and use that to start the REPL. + // We ignore params we don't care about - hence error function is empty + val command = new GenericRunnerCommand(cmdArgs, _ => ()) + + // inherit defaults for embedded interpretter (needed for running with SBT) + // (TypedPipe chosen arbitrarily, just needs to be something representative) + command.settings.embeddedDefaults[TypedPipe[String]] + + // if running from the assembly, need to explicitly tell it to use java classpath + if (args.contains("--repl")) command.settings.usejavacp.value = true + + command.settings.classpath.append(System.getProperty("java.class.path")) + + // Force the repl to be synchronous, so all cmds are executed in the same thread + command.settings.Yreplsync.value = true + + val repl = new ZeppelinScaldingILoop(None, out) + zeppelinScaldingREPL = Some(repl) + replState.mode = mode + replState.customConfig = replState.customConfig ++ (mode match { + case _: HadoopMode => cfg + case _ => Config.empty + }) + + // if in Hdfs mode, store the mode to enable switching between Local and Hdfs + mode match { + case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) + case _ => () + } + + repl.settings = command.settings + return repl; + + } + + /** + * Creates a jar file in a temporary directory containing the code thus far compiled by the REPL. + * Will use createReplCodeJar in base class once we are able to set scaldingREPL which is + * currently private. + * @return some file for the jar created, or `None` if the REPL is not running. + */ + def createReplCodeJar1(): Option[File] = { + zeppelinScaldingREPL.map { repl => + val virtualDirectory = repl.virtualDirectory + val tempJar = new File(Files.createTempDir(), + "scalding-repl-session-" + System.currentTimeMillis() + ".jar") + createJar(virtualDirectory.asInstanceOf[VirtualDirectory], tempJar) + } + } + + /** + * Creates a jar file from the classes contained in a virtual directory. + * + * @param virtualDirectory containing classes that should be added to the jar. + * @param jarFile that will be written. + * @return the jarFile specified and written. + */ + private def createJar(virtualDirectory: VirtualDirectory, jarFile: File): File = { + val jarStream = new JarOutputStream(new FileOutputStream(jarFile)) + try { + addVirtualDirectoryToJar(virtualDirectory, "", jarStream) + } finally { + jarStream.close() + } + + jarFile + } + + /** + * Add the contents of the specified virtual directory to a jar. This method will recursively + * descend into subdirectories to add their contents. + * + * @param dir is a virtual directory whose contents should be added. + * @param entryPath for classes found in the virtual directory. + * @param jarStream for writing the jar file. + */ + private def addVirtualDirectoryToJar( + dir: VirtualDirectory, + entryPath: String, + jarStream: JarOutputStream) { + dir.foreach { file => + if (file.isDirectory) { + // Recursively descend into subdirectories, adjusting the package name as we do. + val dirPath = entryPath + file.name + "/" + val entry: JarEntry = new JarEntry(dirPath) + jarStream.putNextEntry(entry) + jarStream.closeEntry() + addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory], dirPath, jarStream) + } else if (file.hasExtension("class")) { + // Add class files as an entry in the jar file and write the class to the jar. + val entry: JarEntry = new JarEntry(entryPath + file.name) + jarStream.putNextEntry(entry) + jarStream.write(file.toByteArray) + jarStream.closeEntry() + } + } + } + + /* + * Only for testing + */ + override def main(args: Array[String]) { + val out = new JPrintWriter(Console.out, true) + val repl = getRepl(args, out) + val retval = repl.process(repl.settings) + + if (!retval) { + sys.exit(1) + } + } +} \ No newline at end of file diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala deleted file mode 100644 index bd23c4937f5..00000000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding; - -import java.io.{BufferedReader, File, FileReader} - -import scala.tools.nsc.GenericRunnerSettings -import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter} - - -/** - * A class providing Scalding specific commands for inclusion in the Scalding REPL. - * This is currently forked from Scalding, but should eventually make it into Scalding itself: - * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala - */ - class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter) - extends ILoop(in0, out) { - // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - // def this() = this(None, new JPrintWriter(Console.out, true)) - - settings = new GenericRunnerSettings({ s => echo(s) }) - - override def printWelcome() { - val fc = Console.YELLOW - val wc = Console.RED - def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc) - echo(fc + - " ( \n" + - " )\\ ) ( ( \n" + - "(()/( ) )\\ )\\ ) ( ( ( \n" + - " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" + - "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc + - wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") + - wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") + - "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" + - " |___/ ") - } - - /** - * Commands specific to the Scalding REPL. To define a new command use one of the following - * factory methods: - * - `LoopCommand.nullary` for commands that take no arguments - * - `LoopCommand.cmd` for commands that take one string argument - * - `LoopCommand.varargs` for commands that take multiple string arguments - */ - private val scaldingCommands: List[LoopCommand] = List() - - /** - * Change the shell prompt to read scalding> - * - * @return a prompt string to use for this REPL. - */ - override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET - - private[this] def addImports(ids: String*): IR.Result = - if (ids.isEmpty) IR.Success - else intp.interpret("import " + ids.mkString(", ")) - - /** - * Search for files with the given name in all directories from current directory - * up to root. - */ - private def findAllUpPath(filename: String): List[File] = - Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent) - .takeWhile(_ != "/") - .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename))) - .toList - - /** - * Gets the list of commands that this REPL supports. - * - * @return a list of the command supported by this REPL. - */ - override def commands: List[LoopCommand] = super.commands ++ scaldingCommands - - protected def imports: List[String] = List( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._", - "com.twitter.scalding.ReplState._") - - override def createInterpreter() { - super.createInterpreter() - intp.beQuietDuring { - addImports(imports: _*) - - settings match { - case s: GenericRunnerSettings => - findAllUpPath(".scalding_repl").reverse.foreach { - f => s.loadfiles.appendToValue(f.toString) - } - case _ => () - } - } - } -} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 08c67dacd3b..7ffbd975b3e 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -57,6 +57,7 @@ public void setUp() throws Exception { if (repl == null) { Properties p = new Properties(); + p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl"); repl = new ScaldingInterpreter(p); repl.open(); @@ -119,7 +120,7 @@ public void testBasicScalding() { "val salesPipe = TypedPipe.from(salesList)\n" + "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + - "results.dump", + "results.dump", context).code()); } From 0e43d002d5c1c02355b462ccc946378e2acd945e Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Wed, 25 May 2016 12:25:17 -0700 Subject: [PATCH 02/15] Add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency --- scalding/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scalding/pom.xml b/scalding/pom.xml index 71b0eb88f20..f953008f3d5 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -45,6 +45,11 @@ Concurrent Maven Repo http://conjars.org/repo + + twitter + Twitter Maven Repo + http://maven.twttr.com + From 9060073c8dd81003a824e53de4bcb576b9c1a40d Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Fri, 3 Jun 2016 10:58:26 -0700 Subject: [PATCH 03/15] Move ZeppelinScaldingShell to org.apache.zeppelin.scalding, use java 1.8, scala 2.11 --- pom.xml | 4 +- scalding/pom.xml | 9 +- .../scalding/ScaldingInterpreter.java | 90 +++++++--- .../twitter/scalding/ZeppelinReplState.scala | 100 ----------- .../scalding/ZeppelinScaldingShell.scala | 155 ------------------ .../zeppelin/scalding/ZeppelinReplState.scala | 48 ++++++ .../scalding/ZeppelinScaldingLoop.scala | 10 +- .../scalding/ZeppelinScaldingShell.scala | 72 ++++++++ zeppelin-server/pom.xml | 10 +- 9 files changed, 205 insertions(+), 293 deletions(-) delete mode 100644 scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala delete mode 100644 scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala create mode 100644 scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala rename scalding/src/main/scala/{com/twitter => org/apache/zeppelin}/scalding/ZeppelinScaldingLoop.scala (85%) create mode 100644 scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala diff --git a/pom.xml b/pom.xml index 736026c6ee8..006fc881815 100755 --- a/pom.xml +++ b/pom.xml @@ -229,8 +229,8 @@ maven-compiler-plugin 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/scalding/pom.xml b/scalding/pom.xml index f953008f3d5..12ed5ed9ad2 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -36,7 +36,7 @@ 2.11.8 2.6.0 - 0.16.0 + 0.16.1-SNAPSHOT @@ -45,11 +45,6 @@ Concurrent Maven Repo http://conjars.org/repo - - twitter - Twitter Maven Repo - http://maven.twttr.com - @@ -132,12 +127,12 @@ ${scala.version} - org.apache.hadoop hadoop-client ${hadoop.version} + diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index 56fd6246441..39b07764780 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -17,13 +17,8 @@ package org.apache.zeppelin.scalding; -import java.io.*; -import java.util.*; -import java.net.URL; -import java.net.URLClassLoader; - import com.twitter.scalding.ScaldingILoop; -import org.apache.commons.io.output.WriterOutputStream; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; @@ -31,17 +26,20 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Console; -import scala.Some; -import scala.None; -import scala.tools.nsc.Settings; -import scala.tools.nsc.interpreter.IMain; -import scala.tools.nsc.settings.MutableSettings.BooleanSetting; -import scala.tools.nsc.settings.MutableSettings.PathSetting; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; /** * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. @@ -51,6 +49,9 @@ public class ScaldingInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); static final String ARGS_STRING = "args.string"; + static final String ARGS_STRING_DEFAULT = "--local --repl"; + static final String MAX_OPEN_INSTANCES = "max.open.instances"; + static final String MAX_OPEN_INSTANCES_DEFAULT = "50"; public static final List NO_COMPLETION = Collections.unmodifiableList(new ArrayList()); @@ -61,9 +62,12 @@ public class ScaldingInterpreter extends Interpreter { "scalding", ScaldingInterpreter.class.getName(), new InterpreterPropertyBuilder() - .add(ARGS_STRING, "--hdfs --repl", "Arguments for scalding REPL").build()); + .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL") + .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT, "Maximum number of open interpreter instances") + .build()); } + static int numOpenInstances = 0; private ScaldingILoop interpreter; private ByteArrayOutputStream out; @@ -74,8 +78,23 @@ public ScaldingInterpreter(Properties property) { @Override public void open() { + numOpenInstances = numOpenInstances + 1; + String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES, + MAX_OPEN_INSTANCES_DEFAULT); + int maxOpenInstances = 50; + try { + maxOpenInstances = Integer.valueOf(maxOpenInstancesStr); + } catch (Exception e) { + logger.error("Error reading max.open.instances", e); + } + logger.info("max.open.instances = {}", maxOpenInstances); + if (numOpenInstances > maxOpenInstances) { + logger.error("Reached maximum number of open instances"); + return; + } + logger.info("Opening instance {}", numOpenInstances); logger.info("property: {}", property); - String argsString = property.getProperty(ARGS_STRING); + String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); String[] args; if (argsString == null) { args = new String[0]; @@ -85,7 +104,7 @@ public void open() { logger.info("{}", Arrays.toString(args)); PrintWriter printWriter = new PrintWriter(out, true); - interpreter = com.twitter.scalding.ZeppelinScaldingShell.getRepl(args, printWriter); + interpreter = ZeppelinScaldingShell.getRepl(args, printWriter); interpreter.createInterpreter(); } @@ -97,12 +116,43 @@ public void close() { @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { - logger.info("Running Scalding command '" + cmd + "'"); - + String user = contextInterpreter.getAuthenticationInfo().getUser(); + logger.info("Running Scalding command: user: {} cmd: '{}'", user, cmd); + + if (interpreter == null) { + logger.error( + "interpreter == null, open may not have been called because max.open.instances reached"); + return new InterpreterResult(Code.ERROR, + "interpreter == null\n" + + "open may not have been called because max.open.instances reached" + ); + } if (cmd == null || cmd.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); } - return interpret(cmd.split("\n"), contextInterpreter); + InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR); + if (property.getProperty(ARGS_STRING).contains("hdfs")) { + UserGroupInformation ugi = null; + try { + ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + logger.error("Error creating UserGroupInformation", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + try { + interpreterResult = ugi.doAs(new PrivilegedExceptionAction() { + public InterpreterResult run() throws Exception { + return interpret(cmd.split("\n"), contextInterpreter); + } + }); + } catch (Exception e) { + logger.error("Error running command with ugi.doAs", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } else { + interpreterResult = interpret(cmd.split("\n"), contextInterpreter); + } + return interpreterResult; } public InterpreterResult interpret(String[] lines, InterpreterContext context) { diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala deleted file mode 100644 index 2d10b0c8933..00000000000 --- a/scalding/src/main/scala/com/twitter/scalding/ZeppelinReplState.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.scalding - -/** - * Stores REPL state - */ - -import cascading.flow.FlowDef -import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } -import scala.concurrent.Future -import scala.util.{Failure, Success} - -object ZeppelinReplState extends BaseReplState { - override def shell = ZeppelinScaldingShell - - /** Create config for execution. Tacks on a new jar for each execution. - * We have to use executionConfig1 and override run, asyncExecute, execute since we need - * to use ZeppelinScaldingShell.createReplCodeJar1. See comment for that method. - */ - def executionConfig1: Config = { - // Create a jar to hold compiled code for this REPL session in addition to - // "tempjars" which can be passed in from the command line, allowing code - // in the repl to be distributed for the Hadoop job to run. - val replCodeJar: Option[java.io.File] = shell.createReplCodeJar1() - val tmpJarsConfig: Map[String, String] = - replCodeJar match { - case Some(jar) => - Map("tmpjars" -> { - // Use tmpjars already in the configuration. - config.get("tmpjars").map(_ + ",").getOrElse("") - // And a jar of code compiled by the REPL. - .concat("file://" + jar.getAbsolutePath) - }) - case None => - // No need to add the tmpjars to the configuration - Map() - } - config ++ tmpJarsConfig - } - - /** - * Runs this pipe as a Scalding job. - * - * Automatically cleans up the flowDef to include only sources upstream from tails. - */ - override def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = - ExecutionContext.newContext(executionConfig1)(fd, md).waitFor match { - case Success(stats) => Some(stats) - case Failure(e) => - println("Flow execution failed!") - e.printStackTrace() - None - } - - /* - * Starts the Execution, but does not wait for the result - */ - override def asyncExecute[T](execution: Execution[T])(implicit ec: ConcurrentExecutionContext): Future[T] = - execution.run(executionConfig1, mode) - - /* - * This runs the Execution[T] and waits for the result - */ - override def execute[T](execution: Execution[T]): T = - execution.waitFor(executionConfig1, mode).get - - -} - -/** - * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly - * used everywhere. - */ -object ZeppelinReplImplicitContext { - /** Implicit execution context for using the Execution monad */ - implicit val executionContext = ConcurrentExecutionContext.global - /** Implicit repl state used for ShellPipes */ - implicit def stateImpl = ZeppelinReplState - /** Implicit flowDef for this Scalding shell session. */ - implicit def flowDefImpl = ZeppelinReplState.flowDef - /** Defaults to running in local mode if no mode is specified. */ - implicit def modeImpl = ZeppelinReplState.mode - implicit def configImpl = ZeppelinReplState.config -} diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala deleted file mode 100644 index cb6cf360aab..00000000000 --- a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingShell.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.scalding - - -import java.io.{FileOutputStream, File} -import java.util.jar.{JarEntry, JarOutputStream} -import com.google.common.io.Files -import scala.tools.nsc.io._ -import scala.tools.nsc.{GenericRunnerSettings, GenericRunnerCommand} -import scala.tools.nsc.interpreter._ - -/** - * TBD - */ -object ZeppelinScaldingShell extends BaseScaldingShell { - - override def replState = ZeppelinReplState - - /** - * An instance of the Scala REPL the user will interact with. - */ - var zeppelinScaldingREPL: Option[ILoop] = None - - def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { - - val argsExpanded = ExpandLibJarsGlobs(args) - val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) - - // Process command line arguments into a settings object, and use that to start the REPL. - // We ignore params we don't care about - hence error function is empty - val command = new GenericRunnerCommand(cmdArgs, _ => ()) - - // inherit defaults for embedded interpretter (needed for running with SBT) - // (TypedPipe chosen arbitrarily, just needs to be something representative) - command.settings.embeddedDefaults[TypedPipe[String]] - - // if running from the assembly, need to explicitly tell it to use java classpath - if (args.contains("--repl")) command.settings.usejavacp.value = true - - command.settings.classpath.append(System.getProperty("java.class.path")) - - // Force the repl to be synchronous, so all cmds are executed in the same thread - command.settings.Yreplsync.value = true - - val repl = new ZeppelinScaldingILoop(None, out) - zeppelinScaldingREPL = Some(repl) - replState.mode = mode - replState.customConfig = replState.customConfig ++ (mode match { - case _: HadoopMode => cfg - case _ => Config.empty - }) - - // if in Hdfs mode, store the mode to enable switching between Local and Hdfs - mode match { - case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) - case _ => () - } - - repl.settings = command.settings - return repl; - - } - - /** - * Creates a jar file in a temporary directory containing the code thus far compiled by the REPL. - * Will use createReplCodeJar in base class once we are able to set scaldingREPL which is - * currently private. - * @return some file for the jar created, or `None` if the REPL is not running. - */ - def createReplCodeJar1(): Option[File] = { - zeppelinScaldingREPL.map { repl => - val virtualDirectory = repl.virtualDirectory - val tempJar = new File(Files.createTempDir(), - "scalding-repl-session-" + System.currentTimeMillis() + ".jar") - createJar(virtualDirectory.asInstanceOf[VirtualDirectory], tempJar) - } - } - - /** - * Creates a jar file from the classes contained in a virtual directory. - * - * @param virtualDirectory containing classes that should be added to the jar. - * @param jarFile that will be written. - * @return the jarFile specified and written. - */ - private def createJar(virtualDirectory: VirtualDirectory, jarFile: File): File = { - val jarStream = new JarOutputStream(new FileOutputStream(jarFile)) - try { - addVirtualDirectoryToJar(virtualDirectory, "", jarStream) - } finally { - jarStream.close() - } - - jarFile - } - - /** - * Add the contents of the specified virtual directory to a jar. This method will recursively - * descend into subdirectories to add their contents. - * - * @param dir is a virtual directory whose contents should be added. - * @param entryPath for classes found in the virtual directory. - * @param jarStream for writing the jar file. - */ - private def addVirtualDirectoryToJar( - dir: VirtualDirectory, - entryPath: String, - jarStream: JarOutputStream) { - dir.foreach { file => - if (file.isDirectory) { - // Recursively descend into subdirectories, adjusting the package name as we do. - val dirPath = entryPath + file.name + "/" - val entry: JarEntry = new JarEntry(dirPath) - jarStream.putNextEntry(entry) - jarStream.closeEntry() - addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory], dirPath, jarStream) - } else if (file.hasExtension("class")) { - // Add class files as an entry in the jar file and write the class to the jar. - val entry: JarEntry = new JarEntry(entryPath + file.name) - jarStream.putNextEntry(entry) - jarStream.write(file.toByteArray) - jarStream.closeEntry() - } - } - } - - /* - * Only for testing - */ - override def main(args: Array[String]) { - val out = new JPrintWriter(Console.out, true) - val repl = getRepl(args, out) - val retval = repl.process(repl.settings) - - if (!retval) { - sys.exit(1) - } - } -} \ No newline at end of file diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala new file mode 100644 index 00000000000..b847eba0012 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +/** + * Stores REPL state + */ + +import cascading.flow.FlowDef +import com.twitter.scalding.BaseReplState +import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } +import scala.concurrent.Future +import scala.util.{Failure, Success} + +object ZeppelinReplState extends BaseReplState { + override def shell = ZeppelinScaldingShell +} + +/** + * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly + * used everywhere. + */ +object ZeppelinReplImplicitContext { + /** Implicit execution context for using the Execution monad */ + implicit val executionContext = ConcurrentExecutionContext.global + /** Implicit repl state used for ShellPipes */ + implicit def stateImpl = ZeppelinReplState + /** Implicit flowDef for this Scalding shell session. */ + implicit def flowDefImpl = ZeppelinReplState.flowDef + /** Defaults to running in local mode if no mode is specified. */ + implicit def modeImpl = ZeppelinReplState.mode + implicit def configImpl = ZeppelinReplState.config +} diff --git a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala similarity index 85% rename from scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala rename to scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala index a8c5608cb22..9be01998695 100644 --- a/scalding/src/main/scala/com/twitter/scalding/ZeppelinScaldingLoop.scala +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala @@ -15,9 +15,11 @@ * limitations under the License. */ -package com.twitter.scalding +package org.apache.zeppelin.scalding import java.io.BufferedReader +import com.twitter.scalding.ScaldingILoop + import scala.tools.nsc.interpreter._ /** @@ -36,9 +38,9 @@ class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) valuePipeToShellValuePipe }""", "com.twitter.scalding.ReplImplicits", - "com.twitter.scalding.ZeppelinReplImplicitContext._", - "com.twitter.scalding.ZeppelinReplState", - "com.twitter.scalding.ZeppelinReplState._" + "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._", + "org.apache.zeppelin.scalding.ZeppelinReplState", + "org.apache.zeppelin.scalding.ZeppelinReplState._" ) } diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala new file mode 100644 index 00000000000..29e5f835cb4 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TypedPipe +import scala.tools.nsc.{GenericRunnerCommand} +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +object ZeppelinScaldingShell extends BaseScaldingShell { + + override def replState = ZeppelinReplState + + def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { + + val argsExpanded = ExpandLibJarsGlobs(args) + val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) + + // Process command line arguments into a settings object, and use that to start the REPL. + // We ignore params we don't care about - hence error function is empty + val command = new GenericRunnerCommand(cmdArgs, _ => ()) + + // inherit defaults for embedded interpretter (needed for running with SBT) + // (TypedPipe chosen arbitrarily, just needs to be something representative) + command.settings.embeddedDefaults[TypedPipe[String]] + + // if running from the assembly, need to explicitly tell it to use java classpath + if (args.contains("--repl")) command.settings.usejavacp.value = true + + command.settings.classpath.append(System.getProperty("java.class.path")) + + // Force the repl to be synchronous, so all cmds are executed in the same thread + command.settings.Yreplsync.value = true + + val repl = new ZeppelinScaldingILoop(None, out) + scaldingREPL = Some(repl) + replState.mode = mode + replState.customConfig = replState.customConfig ++ (mode match { + case _: HadoopMode => cfg + case _ => Config.empty + }) + + // if in Hdfs mode, store the mode to enable switching between Local and Hdfs + mode match { + case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) + case _ => () + } + + repl.settings = command.settings + return repl; + + } + +} diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index e33df3e258f..11061021d45 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -43,19 +43,19 @@ org.scala-lang scala-library - 2.10.4 + 2.11.8 org.scala-lang scala-compiler - 2.10.4 + 2.11.8 org.scala-lang scalap - 2.10.4 + 2.11.8 @@ -271,8 +271,8 @@ org.scalatest - scalatest_2.10 - 2.1.1 + scalatest_2.11 + 2.2.4 test From a84ea3e3f59bd60dae44e7a7845afcffe75d41de Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Fri, 3 Jun 2016 11:21:27 -0700 Subject: [PATCH 04/15] Use oraclejdk8 in travis.yml --- .travis.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index f7ff08b03f1..ab2607e8387 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,31 +33,31 @@ addons: matrix: include: # Test all modules - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test spark module for 1.5.2 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,r -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.4.1 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,r -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.3.1 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.3.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.3 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.2.2 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.2.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.2 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.1.1 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: SPARK_VER="1.1.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.1 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test selenium with spark module for 1.6.1 - - jdk: "oraclejdk7" + - jdk: "oraclejdk8" env: TEST_SELENIUM="true" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false" before_install: From 301f1267b35af7410cfa5496dd5f186a391e0b14 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Fri, 3 Jun 2016 13:35:59 -0700 Subject: [PATCH 05/15] Change scalding.version to 0.16.1-RC1 --- scalding/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding/pom.xml b/scalding/pom.xml index 12ed5ed9ad2..71e3979a7d1 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -36,7 +36,7 @@ 2.11.8 2.6.0 - 0.16.1-SNAPSHOT + 0.16.1-RC1 From d2052caf8615ff8f3a28dd94e1c8ab5bcbcbd6c4 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Fri, 3 Jun 2016 14:29:12 -0700 Subject: [PATCH 06/15] Re-add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency --- scalding/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scalding/pom.xml b/scalding/pom.xml index 71e3979a7d1..a3b3b58fe63 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -45,6 +45,11 @@ Concurrent Maven Repo http://conjars.org/repo + + twitter + Twitter Maven Repo + http://maven.twttr.com + From 44e5702bff55c669a80c7644124ca18473de7c2e Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Fri, 3 Jun 2016 14:57:38 -0700 Subject: [PATCH 07/15] Fix checkstyle error --- .../java/org/apache/zeppelin/scalding/ScaldingInterpreter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index 39b07764780..16f34d7c945 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -63,7 +63,8 @@ public class ScaldingInterpreter extends Interpreter { ScaldingInterpreter.class.getName(), new InterpreterPropertyBuilder() .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL") - .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT, "Maximum number of open interpreter instances") + .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT, + "Maximum number of open interpreter instances") .build()); } From 5db8daf4025966cf11cb26f360fd1386a0cec422 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Sat, 4 Jun 2016 06:51:33 -0700 Subject: [PATCH 08/15] Make variables final to avoid java 1.7 compiler error, go back to java 1.7 --- .travis.yml | 14 +++++++------- pom.xml | 4 ++-- .../zeppelin/scalding/ScaldingInterpreter.java | 11 ++++++++--- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index ab2607e8387..f7ff08b03f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,31 +33,31 @@ addons: matrix: include: # Test all modules - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test spark module for 1.5.2 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.5.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.5 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,r -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.4.1 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.4.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.4 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark,r -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.3.1 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.3.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.3 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.2.2 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.2.2" HADOOP_VER="2.3" PROFILE="-Pspark-1.2 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test spark module for 1.1.1 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: SPARK_VER="1.1.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.1 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.rest.*Test,org.apache.zeppelin.spark* -DfailIfNoTests=false" # Test selenium with spark module for 1.6.1 - - jdk: "oraclejdk8" + - jdk: "oraclejdk7" env: TEST_SELENIUM="true" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Phadoop-2.3 -Ppyspark" BUILD_FLAG="package -DskipTests" TEST_FLAG="verify" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false" before_install: diff --git a/pom.xml b/pom.xml index 006fc881815..736026c6ee8 100755 --- a/pom.xml +++ b/pom.xml @@ -229,8 +229,8 @@ maven-compiler-plugin 3.1 - 1.8 - 1.8 + 1.7 + 1.7 diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index 16f34d7c945..d216d8ada98 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -141,11 +141,16 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr return new InterpreterResult(Code.ERROR, e.getMessage()); } try { - interpreterResult = ugi.doAs(new PrivilegedExceptionAction() { + // Make variables final to avoid "local variable is accessed from within inner class; + // needs to be declared final" exception in JDK7 + final String cmd1 = cmd; + final InterpreterContext contextInterpreter1 = contextInterpreter; + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { public InterpreterResult run() throws Exception { - return interpret(cmd.split("\n"), contextInterpreter); + return interpret(cmd1.split("\n"), contextInterpreter1); } - }); + }; + interpreterResult = ugi.doAs(action); } catch (Exception e) { logger.error("Error running command with ugi.doAs", e); return new InterpreterResult(Code.ERROR, e.getMessage()); From a29867d9f5c63017b7d62d8f1ca8b7a3adca45d7 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Sat, 4 Jun 2016 07:17:06 -0700 Subject: [PATCH 09/15] Update scalding interpreter doc --- docs/interpreter/scalding.md | 86 +++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md index 44303125bdc..ec5608bf3b3 100644 --- a/docs/interpreter/scalding.md +++ b/docs/interpreter/scalding.md @@ -28,10 +28,49 @@ In a notebook, to enable the **Scalding** interpreter, click on the **Gear** ico ### Configuring the Interpreter -Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything. + +Scalding interpreter runs in two modes: + +* local +* hdfs + +In the local mode, you can access files on the local server and scalding transformation are done locally. + +In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs. + +Zeppelin comes with a pre-configured Scalding interpreter in local mode. + +To run the scalding interpreter in the hdfs mode you have to do the following: + +**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES** + +In conf/zeppelin_env.sh, you have to set +ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath' +and directories with custom jar files you need for your scalding commands. + +**Set arguments to the scalding repl** + +The default arguments are: "--local --repl" + +For hdfs mode you need to add: "--hdfs --repl" + +If you want to add custom jars, you need to add: +"-libjars directory/*:directory/*" + +For reducer estimation, you need to add something like: +"-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator" + +**Set max.open.instances** + +If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note +option and set max.open.instances argument. ### Testing the Interpreter -In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book. + +#### Local mode + +In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, +we will count words (of course!), and plot a graph of the top 10 words in the book. ``` %scalding @@ -71,7 +110,44 @@ print("%table " + table) If you click on the icon for the pie chart, you should be able to see a chart like this: ![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png) -### Current Status & Future Work -The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. -The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!). +#### HDFS mode + +**Test mode** + +``` +%scalding +mode +``` +This command should print: + +``` +res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) +``` + + +**Test HDFS read** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +testfile.dump +``` + +This command should print the contents of the hdfs file /user/x/testfile. + +**Test map-reduce job** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +val a = testfile.groupAll.size.values +a.toList + +``` + +This command should create a map reduce job. + +### Future Work +* Better user feedback (hadoop url, progress updates) +* Ability to cancel jobs +* Ability to dynamically load jars without restarting the interpreter +* Multiuser scalability (run scalding interpreters on different servers) From 92bfcca4ed8f6eba1e43b25d7d898993f4bfb1c9 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Sat, 4 Jun 2016 07:34:35 -0700 Subject: [PATCH 10/15] Fix checkstyle error --- .../apache/zeppelin/scalding/ScaldingInterpreter.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index d216d8ada98..4542297e296 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -145,11 +145,12 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr // needs to be declared final" exception in JDK7 final String cmd1 = cmd; final InterpreterContext contextInterpreter1 = contextInterpreter; - PrivilegedExceptionAction action = new PrivilegedExceptionAction() { - public InterpreterResult run() throws Exception { - return interpret(cmd1.split("\n"), contextInterpreter1); - } - }; + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { + public InterpreterResult run() throws Exception { + return interpret(cmd1.split("\n"), contextInterpreter1); + } + }; interpreterResult = ugi.doAs(action); } catch (Exception e) { logger.error("Error running command with ugi.doAs", e); From d74c52c6823f5fe4d9679be55136a446f7ba8833 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Sat, 4 Jun 2016 12:30:30 -0700 Subject: [PATCH 11/15] Remove -Pscalding from .travis.yml PROFILE variable --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index f7ff08b03f1..caf469d58bf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,7 +34,7 @@ matrix: include: # Test all modules - jdk: "oraclejdk7" - env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" From 730d5b69e4fb6f5049c69008d5d7c22f09231f55 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Sat, 4 Jun 2016 14:21:38 -0700 Subject: [PATCH 12/15] Remove scalding profile from pom.xml --- docs/interpreter/scalding.md | 7 ------- pom.xml | 7 ------- 2 files changed, 14 deletions(-) diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md index ec5608bf3b3..147078b3ac0 100644 --- a/docs/interpreter/scalding.md +++ b/docs/interpreter/scalding.md @@ -9,13 +9,6 @@ group: manual ## Scalding Interpreter for Apache Zeppelin [Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs. -### Building the Scalding Interpreter -You have to first build the Scalding interpreter by enable the **scalding** profile as follows: - -``` -mvn clean package -Pscalding -DskipTests -``` - ### Enabling the Scalding Interpreter In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**. diff --git a/pom.xml b/pom.xml index 736026c6ee8..bf7d8d06a65 100755 --- a/pom.xml +++ b/pom.xml @@ -683,13 +683,6 @@ - - scalding - - scalding - - - build-distr From f59bb553d2c5a05d611ea443974f2d91fb9f71c2 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Mon, 6 Jun 2016 15:07:18 -0700 Subject: [PATCH 13/15] Revert scala version change in zeppelin-server/pom.xml --- zeppelin-server/pom.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 11061021d45..e33df3e258f 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -43,19 +43,19 @@ org.scala-lang scala-library - 2.11.8 + 2.10.4 org.scala-lang scala-compiler - 2.11.8 + 2.10.4 org.scala-lang scalap - 2.11.8 + 2.10.4 @@ -271,8 +271,8 @@ org.scalatest - scalatest_2.11 - 2.2.4 + scalatest_2.10 + 2.1.1 test From a001660b39b5d635cfbd96e2906a1db16302f2c0 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Tue, 7 Jun 2016 07:53:44 -0700 Subject: [PATCH 14/15] Restore scalding profile --- .travis.yml | 2 +- pom.xml | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index caf469d58bf..f7ff08b03f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,7 +34,7 @@ matrix: include: # Test all modules - jdk: "oraclejdk7" - env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" + env: SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/pom.xml b/pom.xml index bf7d8d06a65..8799ff687a8 100755 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,6 @@ cassandra elasticsearch alluxio - scalding zeppelin-web zeppelin-server zeppelin-distribution @@ -683,6 +682,13 @@ + + scalding + + scalding + + + build-distr From e91efd13a7fa7279f452baf786ccc9d21f61a689 Mon Sep 17 00:00:00 2001 From: Prasad Wagle Date: Tue, 7 Jun 2016 07:58:36 -0700 Subject: [PATCH 15/15] Restore document section on how to build scalding interpreter by enabling scalding profile --- docs/interpreter/scalding.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md index 147078b3ac0..ec5608bf3b3 100644 --- a/docs/interpreter/scalding.md +++ b/docs/interpreter/scalding.md @@ -9,6 +9,13 @@ group: manual ## Scalding Interpreter for Apache Zeppelin [Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs. +### Building the Scalding Interpreter +You have to first build the Scalding interpreter by enable the **scalding** profile as follows: + +``` +mvn clean package -Pscalding -DskipTests +``` + ### Enabling the Scalding Interpreter In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**.