diff --git a/python/pom.xml b/python/pom.xml index 35524370f56..681986c7518 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -98,6 +98,43 @@ + + org.codehaus.mojo + wagon-maven-plugin + 1.0 + + + package + download-single + + https://pypi.python.org/packages/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3 + py4j-${py4j.version}.zip + ${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip + + + + + + + maven-antrun-plugin + 1.7 + + + package + + + + + + + run + + + + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java index c41a3cc3a8c..582debd4cde 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java @@ -21,10 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; +import java.nio.file.Paths; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -37,6 +35,7 @@ public class PythonDockerInterpreter extends Interpreter { Pattern activatePattern = Pattern.compile("activate\\s*(.*)"); Pattern deactivatePattern = Pattern.compile("deactivate"); Pattern helpPattern = Pattern.compile("help"); + private File zeppelinHome; public PythonDockerInterpreter(Properties property) { super(property); @@ -44,7 +43,11 @@ public PythonDockerInterpreter(Properties property) { @Override public void open() { - + if (System.getenv("ZEPPELIN_HOME") != null) { + zeppelinHome = new File(System.getenv("ZEPPELIN_HOME")); + } else { + zeppelinHome = Paths.get("..").toAbsolutePath().toFile(); + } } @Override @@ -54,6 +57,7 @@ public void close() { @Override public InterpreterResult interpret(String st, InterpreterContext context) { + File pythonScript = new File(getPythonInterpreter().getScriptPath()); InterpreterOutput out = context.out; Matcher activateMatcher = activatePattern.matcher(st); @@ -66,7 +70,27 @@ public InterpreterResult interpret(String st, InterpreterContext context) { } else if (activateMatcher.matches()) { String image = activateMatcher.group(1); pull(out, image); - setPythonCommand("docker run -i --rm " + image + " python -iu"); + + // mount pythonscript dir + String mountPythonScript = "-v " + + pythonScript.getParentFile().getAbsolutePath() + + ":/_zeppelin_tmp "; + + // mount zeppelin dir + String mountPy4j = "-v " + + zeppelinHome.getAbsolutePath() + + ":/_zeppelin "; + + // set PYTHONPATH + String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" + + ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS; + + setPythonCommand("docker run -i --rm " + + mountPythonScript + + mountPy4j + + "-e PYTHONPATH=\"" + pythonPath + "\" " + + image + + " python /_zeppelin_tmp/" + pythonScript.getName()); restartPythonProcess(); out.clear(); return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated"); @@ -79,6 +103,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { } } + public void setPythonCommand(String cmd) { PythonInterpreter python = getPythonInterpreter(); python.setPythonCommand(cmd); diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index d77b59a1640..f8255681c23 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -18,20 +18,38 @@ package org.apache.zeppelin.python; import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.net.ServerSocket; +import java.io.OutputStreamWriter; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.*; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteResultHandler; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.PumpStreamHandler; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.commons.io.IOUtils; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -39,144 +57,360 @@ import org.slf4j.LoggerFactory; import py4j.GatewayServer; +import py4j.commands.Command; /** * Python interpreter for Zeppelin. */ -public class PythonInterpreter extends Interpreter { +public class PythonInterpreter extends Interpreter implements ExecuteResultHandler { private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class); - - public static final String BOOTSTRAP_PY = "/bootstrap.py"; - public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py"; - public static final String ZEPPELIN_PYTHON = "zeppelin.python"; + public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py"; + public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src"; + public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python"; public static final String DEFAULT_ZEPPELIN_PYTHON = "python"; public static final String MAX_RESULT = "zeppelin.python.maxResult"; - private Integer port; - private GatewayServer gatewayServer; - private Boolean py4JisInstalled = false; private InterpreterContext context; private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$"); private String pythonPath; private int maxResult; + private String py4jLibPath; + private String pythonLibPath; + + private String pythonCommand; + + private GatewayServer gatewayServer; + private DefaultExecutor executor; + private int port; + private InterpreterOutputStream outputStream; + private BufferedWriter ins; + private PipedInputStream in; + private ByteArrayOutputStream input; + private String scriptPath; + boolean pythonscriptRunning = false; + private static final int MAX_TIMEOUT_SEC = 10; - PythonProcess process = null; - private String pythonCommand = null; + private long pythonPid = 0; + + Integer statementSetNotifier = new Integer(0); public PythonInterpreter(Properties property) { super(property); + try { + File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp")); + scriptPath = scriptFile.getAbsolutePath(); + } catch (IOException e) { + throw new InterpreterException(e); + } } - @Override - public void open() { - // Add matplotlib display hook - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "\nz._displayhook()"); + private String workingDir() { + URL myURL = getClass().getProtectionDomain().getCodeSource().getLocation(); + java.net.URI myURI = null; + try { + myURI = myURL.toURI(); + } catch (URISyntaxException e1) + {} + String path = java.nio.file.Paths.get(myURI).toFile().toString(); + return path; + } + + private void createPythonScript() { + File out = new File(scriptPath); + + if (out.exists() && out.isDirectory()) { + throw new InterpreterException("Can't create python script " + out.getAbsolutePath()); } - // Add zeppelin-bundled libs to PYTHONPATH - setPythonPath("../interpreter/lib/python:$PYTHONPATH"); - LOG.info("Starting Python interpreter ---->"); - LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON)); + copyFile(out, ZEPPELIN_PYTHON); + logger.info("File {} created", scriptPath); + } - maxResult = Integer.valueOf(getProperty(MAX_RESULT)); - process = getPythonProcess(); + public String getScriptPath() { + return scriptPath; + } + private void copyFile(File out, String sourceFile) { + ClassLoader classLoader = getClass().getClassLoader(); try { - process.open(); + FileOutputStream outStream = new FileOutputStream(out); + IOUtils.copy( + classLoader.getResourceAsStream(sourceFile), + outStream); + outStream.close(); } catch (IOException e) { - LOG.error("Can't start the python process", e); + throw new InterpreterException(e); + } + } + + private void createGatewayServerAndStartScript() throws UnknownHostException { + createPythonScript(); + if (System.getenv("ZEPPELIN_HOME") != null) { + py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH; + pythonLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PYTHON_LIBS; + } else { + Path workingPath = Paths.get("..").toAbsolutePath(); + py4jLibPath = workingPath + File.separator + ZEPPELIN_PY4JPATH; + pythonLibPath = workingPath + File.separator + ZEPPELIN_PYTHON_LIBS; + } + + port = findRandomOpenPortOnAllLocalInterfaces(); + gatewayServer = new GatewayServer(this, + port, + GatewayServer.DEFAULT_PYTHON_PORT, + InetAddress.getByName("0.0.0.0"), + InetAddress.getByName("0.0.0.0"), + GatewayServer.DEFAULT_CONNECT_TIMEOUT, + GatewayServer.DEFAULT_READ_TIMEOUT, + (List) null); + + gatewayServer.start(); + + // Run python shell + String pythonCmd = getPythonCommand(); + CommandLine cmd = CommandLine.parse(pythonCmd); + + if (!pythonCmd.endsWith(".py")) { + // PythonDockerInterpreter set pythoncmd with script + cmd.addArgument(getScriptPath(), false); } + cmd.addArgument(Integer.toString(port), false); + cmd.addArgument(getLocalIp(), false); + executor = new DefaultExecutor(); + outputStream = new InterpreterOutputStream(logger); + PipedOutputStream ps = new PipedOutputStream(); + in = null; try { - LOG.info("python PID : " + process.getPid()); - } catch (Exception e) { - LOG.warn("Can't find python pid process", e); + in = new PipedInputStream(ps); + } catch (IOException e1) { + throw new InterpreterException(e1); } + ins = new BufferedWriter(new OutputStreamWriter(ps)); + input = new ByteArrayOutputStream(); + + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); + executor.setStreamHandler(streamHandler); + executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); try { - LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY); - bootStrapInterpreter(BOOTSTRAP_PY); + Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { + env.put("PYTHONPATH", py4jLibPath + File.pathSeparator + pythonLibPath); + } else { + env.put("PYTHONPATH", env.get("PYTHONPATH") + File.pathSeparator + + py4jLibPath + File.pathSeparator + pythonLibPath); + } + + logger.info("cmd = {}", cmd.toString()); + executor.execute(cmd, env, this); + pythonscriptRunning = true; } catch (IOException e) { - LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e); + throw new InterpreterException(e); } - py4JisInstalled = isPy4jInstalled(); - if (py4JisInstalled) { - port = findRandomOpenPortOnAllLocalInterfaces(); - LOG.info("Py4j gateway port : " + port); - try { - gatewayServer = new GatewayServer(this, port); - gatewayServer.start(); - LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY); - bootStrapInterpreter(BOOTSTRAP_INPUT_PY); - } catch (IOException e) { - LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " + - "initialize Zeppelin inputs in python process", e); - } + try { + input.write("import sys, getopt\n".getBytes()); + ins.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + } + + @Override + public void open() { + // Add matplotlib display hook + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { + registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + } + // Add matplotlib display hook + try { + createGatewayServerAndStartScript(); + } catch (UnknownHostException e) { + throw new InterpreterException(e); } } @Override public void close() { - LOG.info("closing Python interpreter <----"); + pythonscriptRunning = false; + pythonScriptInitialized = false; + try { - if (process != null) { - process.close(); - process = null; + ins.flush(); + ins.close(); + input.flush(); + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + executor.getWatchdog().destroyProcess(); + new File(scriptPath).delete(); + gatewayServer.shutdown(); + + // wait until getStatements stop + synchronized (statementSetNotifier) { + try { + statementSetNotifier.wait(1500); + } catch (InterruptedException e) { } - if (gatewayServer != null) { - gatewayServer.shutdown(); + statementSetNotifier.notify(); + } + } + + PythonInterpretRequest pythonInterpretRequest = null; + /** + * Result class of python interpreter + */ + public class PythonInterpretRequest { + public String statements; + + public PythonInterpretRequest(String statements) { + this.statements = statements; + } + + public String statements() { + return statements; + } + } + + public PythonInterpretRequest getStatements() { + synchronized (statementSetNotifier) { + while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) { + try { + statementSetNotifier.wait(1000); + } catch (InterruptedException e) { + } } - } catch (IOException e) { - LOG.error("Can't close the interpreter", e); + PythonInterpretRequest req = pythonInterpretRequest; + pythonInterpretRequest = null; + return req; + } + } + + String statementOutput = null; + boolean statementError = false; + Integer statementFinishedNotifier = new Integer(0); + + public void setStatementsFinished(String out, boolean error) { + synchronized (statementFinishedNotifier) { + statementOutput = out; + statementError = error; + statementFinishedNotifier.notify(); + } + } + + boolean pythonScriptInitialized = false; + Integer pythonScriptInitializeNotifier = new Integer(0); + + public void onPythonScriptInitialized(long pid) { + pythonPid = pid; + synchronized (pythonScriptInitializeNotifier) { + pythonScriptInitialized = true; + pythonScriptInitializeNotifier.notifyAll(); } } + public void appendOutput(String message) throws IOException { + outputStream.getInterpreterOutput().write(message); + } + @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { if (cmd == null || cmd.isEmpty()) { return new InterpreterResult(Code.SUCCESS, ""); } + this.context = contextInterpreter; - String output = sendCommandToPython(cmd); - InterpreterResult result; - if (pythonErrorIn(output)) { - result = new InterpreterResult(Code.ERROR, output.replaceAll("\\.\\.\\.", "")); + if (!pythonscriptRunning) { + return new InterpreterResult(Code.ERROR, "python process not running" + + outputStream.toString()); + } + + outputStream.setInterpreterOutput(context.out); + + synchronized (pythonScriptInitializeNotifier) { + long startTime = System.currentTimeMillis(); + while (pythonScriptInitialized == false + && pythonscriptRunning + && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { + try { + pythonScriptInitializeNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + + List errorMessage; + try { + context.out.flush(); + errorMessage = context.out.toInterpreterResultMessage(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + if (pythonscriptRunning == false) { + // python script failed to initialize and terminated + errorMessage.add(new InterpreterResultMessage( + InterpreterResult.Type.TEXT, "failed to start python")); + return new InterpreterResult(Code.ERROR, errorMessage); + } + if (pythonScriptInitialized == false) { + // timeout. didn't get initialized message + errorMessage.add(new InterpreterResultMessage( + InterpreterResult.Type.TEXT, "python is not responding")); + return new InterpreterResult(Code.ERROR, errorMessage); + } + + pythonInterpretRequest = new PythonInterpretRequest(cmd); + statementOutput = null; + + synchronized (statementSetNotifier) { + statementSetNotifier.notify(); + } + + synchronized (statementFinishedNotifier) { + while (statementOutput == null) { + try { + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + } + } + } + + if (statementError) { + return new InterpreterResult(Code.ERROR, statementOutput); } else { - result = new InterpreterResult(Code.SUCCESS, output); + + try { + context.out.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + return new InterpreterResult(Code.SUCCESS); } - return result; } - /** - * Checks if there is a syntax error or an exception - * - * @param output Python interpreter output - * @return true if syntax error or exception has happened - */ - private boolean pythonErrorIn(String output) { - boolean isError = false; - String[] outputMultiline = output.split("\n"); - Matcher errorMatcher; - for (String row : outputMultiline) { - errorMatcher = errorInLastLine.matcher(row); - if (errorMatcher.find() == true) { - isError = true; - break; - } + public void interrupt() throws IOException { + if (pythonPid > -1) { + logger.info("Sending SIGINT signal to PID : " + pythonPid); + Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); + } else { + logger.warn("Non UNIX/Linux system, close the interpreter"); + close(); } - return isError; } @Override public void cancel(InterpreterContext context) { try { - process.interrupt(); + interrupt(); } catch (IOException e) { - LOG.error("Can't interrupt the python interpreter", e); + e.printStackTrace(); } } @@ -201,28 +435,17 @@ public List completion(String buf, int cursor) { return null; } - public void setPythonPath(String pythonPath) { - this.pythonPath = pythonPath; - } - - public PythonProcess getPythonProcess() { - if (process == null) { - String binPath = getProperty(ZEPPELIN_PYTHON); - if (pythonCommand != null) { - binPath = pythonCommand; - } - return new PythonProcess(binPath, pythonPath); - } else { - return process; - } - } - public void setPythonCommand(String cmd) { + logger.info("Set Python Command : {}", cmd); pythonCommand = cmd; } public String getPythonCommand() { - return pythonCommand; + if (pythonCommand == null) { + return DEFAULT_ZEPPELIN_PYTHON; + } else { + return pythonCommand; + } } private Job getRunningJob(String paragraphId) { @@ -237,24 +460,6 @@ private Job getRunningJob(String paragraphId) { return foundJob; } - - /** - * Sends given text to Python interpreter, blocks and returns the output - * @param cmd Python expression text - * @return output - */ - String sendCommandToPython(String cmd) { - String output = ""; - LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd)); - try { - output = process.sendAndGetResult(cmd); - } catch (IOException e) { - LOG.error("Error when sending commands to python process", e); - } - LOG.debug("Got : \n" + output); - return output; - } - void bootStrapInterpreter(String file) throws IOException { BufferedReader bootstrapReader = new BufferedReader( new InputStreamReader( @@ -265,24 +470,22 @@ void bootStrapInterpreter(String file) throws IOException { while ((line = bootstrapReader.readLine()) != null) { bootstrapCode += line + "\n"; } - if (py4JisInstalled && port != null && port != -1) { - bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString()); - } - LOG.info("Bootstrap python interpreter with code from \n " + file); - sendCommandToPython(bootstrapCode); + + interpret(bootstrapCode, context); } public GUI getGui() { return context.getGui(); } - public Integer getPy4jPort() { - return port; - } - - public Boolean isPy4jInstalled() { - String output = sendCommandToPython("\n\nimport py4j\n"); - return !output.contains("ImportError"); + String getLocalIp() { + try { + return Inet4Address.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + logger.error("can't get local IP", e); + } + // fall back to loopback addreess + return "127.0.0.1"; } private int findRandomOpenPortOnAllLocalInterfaces() { @@ -299,4 +502,16 @@ private int findRandomOpenPortOnAllLocalInterfaces() { public int getMaxResult() { return maxResult; } + + @Override + public void onProcessComplete(int exitValue) { + pythonscriptRunning = false; + logger.info("python process terminated. exit code " + exitValue); + } + + @Override + public void onProcessFailed(ExecuteException e) { + pythonscriptRunning = false; + logger.error("python process failed", e); + } } diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index 381066fe270..6bf1970e259 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -36,7 +36,7 @@ public class PythonInterpreterPandasSql extends Interpreter { private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class); - private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py"; + private String SQL_BOOTSTRAP_FILE_PY = "/python/bootstrap_sql.py"; public PythonInterpreterPandasSql(Properties property) { super(property); @@ -64,25 +64,17 @@ PythonInterpreter getPythonInterpreter() { @Override public void open() { LOG.info("Open Python SQL interpreter instance: {}", this.toString()); + try { LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY); PythonInterpreter python = getPythonInterpreter(); + python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY); } catch (IOException e) { LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e); } } - /** - * Checks if Python dependencies pandas and pandasql are installed - * @return True if they are - */ - boolean isPandasAndPandasqlInstalled() { - PythonInterpreter python = getPythonInterpreter(); - String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n"); - return !output.contains("ImportError"); - } - @Override public void close() { LOG.info("Close Python SQL interpreter instance: {}", this.toString()); @@ -94,7 +86,8 @@ public void close() { public InterpreterResult interpret(String st, InterpreterContext context) { LOG.info("Running SQL query: '{}' over Pandas DataFrame", st); Interpreter python = getPythonInterpreter(); - return python.interpret("z.show(pysqldf('" + st + "'))", context); + + return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context); } @Override diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java b/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java deleted file mode 100644 index 578ffeb8f99..00000000000 --- a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java +++ /dev/null @@ -1,138 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.zeppelin.python; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.OutputStream; -import java.lang.reflect.Field; - -/** - * Object encapsulated interactive - * Python process (REPL) used by python interpreter - */ -public class PythonProcess { - private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class); - private static final String STATEMENT_END = "*!?flush reader!?*"; - InputStream stdout; - OutputStream stdin; - PrintWriter writer; - BufferedReader reader; - Process process; - - private String binPath; - private String pythonPath; - private long pid; - - public PythonProcess(String binPath, String pythonPath) { - this.binPath = binPath; - this.pythonPath = pythonPath; - } - - public void open() throws IOException { - ProcessBuilder builder; - boolean hasParams = binPath.split(" ").length > 1; - if (System.getProperty("os.name").toLowerCase().contains("windows")) { - if (hasParams) { - builder = new ProcessBuilder(binPath.split(" ")); - } else { - builder = new ProcessBuilder(binPath, "-iu"); - } - } else { - String cmd; - if (hasParams) { - cmd = binPath; - } else { - cmd = binPath + " -iu"; - } - builder = new ProcessBuilder("bash", "-c", cmd); - if (pythonPath != null) { - builder.environment().put("PYTHONPATH", pythonPath); - } - } - - builder.redirectErrorStream(true); - process = builder.start(); - stdout = process.getInputStream(); - stdin = process.getOutputStream(); - writer = new PrintWriter(stdin, true); - reader = new BufferedReader(new InputStreamReader(stdout)); - try { - pid = findPid(); - } catch (Exception e) { - logger.warn("Can't find python pid process", e); - pid = -1; - } - } - - public void close() throws IOException { - process.destroy(); - reader.close(); - writer.close(); - stdin.close(); - stdout.close(); - } - - public void interrupt() throws IOException { - if (pid > -1) { - logger.info("Sending SIGINT signal to PID : " + pid); - Runtime.getRuntime().exec("kill -SIGINT " + pid); - } else { - logger.warn("Non UNIX/Linux system, close the interpreter"); - close(); - } - } - - public String sendAndGetResult(String cmd) throws IOException { - writer.println(cmd); - writer.println(); - writer.println("\"" + STATEMENT_END + "\""); - StringBuilder output = new StringBuilder(); - String line = null; - - while ((line = reader.readLine()) != null && - !line.contains(STATEMENT_END)) { - logger.debug("Read line from python shell : " + line); - output.append(line + "\n"); - } - - return output.toString(); - } - - private long findPid() throws NoSuchFieldException, IllegalAccessException { - long pid = -1; - if (process.getClass().getName().equals("java.lang.UNIXProcess")) { - Field f = process.getClass().getDeclaredField("pid"); - f.setAccessible(true); - pid = f.getLong(process); - f.setAccessible(false); - } - return pid; - } - - public long getPid() { - return pid; - } - -} diff --git a/python/src/main/resources/__init__.py b/python/src/main/resources/__init__.py deleted file mode 100644 index ec2014340d7..00000000000 --- a/python/src/main/resources/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/python/src/main/resources/bootstrap.py b/python/src/main/resources/bootstrap.py deleted file mode 100644 index 0a20a343dff..00000000000 --- a/python/src/main/resources/bootstrap.py +++ /dev/null @@ -1,234 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# PYTHON 2 / 3 compatibility : -# bootstrap.py must be runnable with Python 2 or 3 - -import os -import sys -import signal -import base64 -import warnings -from io import BytesIO -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - -def intHandler(signum, frame): # Set the signal handler - print ("Paragraph interrupted") - raise KeyboardInterrupt() - -signal.signal(signal.SIGINT, intHandler) -# set prompt as empty string so that java side don't need to remove the prompt. -sys.ps1="" - -def help(): - print("""%html -

Python Interpreter help

- -

Python 2 & 3 compatibility

-

The interpreter is compatible with Python 2 & 3.
- To change Python version, - change in the interpreter configuration the python to the - desired version (example : python=/usr/bin/python3)

- -

Python modules

-

The interpreter can use all modules already installed - (with pip, easy_install, etc)

- -

Forms

- You must install py4j in order to use - the form feature (pip install py4j) -

Input form

-
print (z.input("f1","defaultValue"))
-

Selection form

-
print(z.select("f2", [("o1","1"), ("o2","2")],2))
-

Checkbox form

-
 print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))
') - -

Matplotlib graph

-
The interpreter can display matplotlib graph with - the function z.show()
-
You need to already have matplotlib module installed - to use this functionality !

-
import matplotlib.pyplot as plt
- plt.figure()
- (.. ..)
- z.show(plt)
- plt.close()
- 
-

z.show function can take optional parameters - to adapt graph dimensions (width and height) and format (png or svg)
-
example : -
z.show(plt,width='50px
- z.show(plt,height='150px', fmt='svg') 
- -

Pandas DataFrame

-
You need to have Pandas module installed - to use this functionality (pip install pandas) !

-
The interpreter can visualize Pandas DataFrame - with the function z.show() -
- import pandas as pd
- df = pd.read_csv("bank.csv", sep=";")
- z.show(df)
- 
- -

SQL over Pandas DataFrame

-
You need to have Pandas&Pandasql modules installed - to use this functionality (pip install pandas pandasql) !

- -
Python interpreter group includes %sql interpreter that can query - Pandas DataFrames using SQL and visualize results using Zeppelin Table Display System - -
- %python
- import pandas as pd
- df = pd.read_csv("bank.csv", sep=";")
- 
-
-
- %python.sql
- %sql
- SELECT * from df LIMIT 5
- 
-
- """) - - -class PyZeppelinContext(object): - """ If py4j is detected, these class will be override - with the implementation in bootstrap_input.py - """ - errorMsg = "You must install py4j Python module " \ - "(pip install py4j) to use Zeppelin dynamic forms features" - - def __init__(self): - self.max_result = 1000 - self._displayhook = lambda *args: None - self._setup_matplotlib() - - def input(self, name, defaultValue=""): - print(self.errorMsg) - - def select(self, name, options, defaultValue=""): - print(self.errorMsg) - - def checkbox(self, name, options, defaultChecked=[]): - print(self.errorMsg) - - def show(self, p, **kwargs): - if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": - self.show_matplotlib(p, **kwargs) - elif type(p).__name__ == "DataFrame": # does not play well with sub-classes - # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` - # and so a dependency on pandas - self.show_dataframe(p, **kwargs) - elif hasattr(p, '__call__'): - p() #error reporting - - def show_dataframe(self, df, show_index=False, **kwargs): - """Pretty prints DF using Table Display System - """ - limit = len(df) > self.max_result - header_buf = StringIO("") - if show_index: - idx_name = str(df.index.name) if df.index.name is not None else "" - header_buf.write(idx_name + "\t") - header_buf.write(str(df.columns[0])) - for col in df.columns[1:]: - header_buf.write("\t") - header_buf.write(str(col)) - header_buf.write("\n") - - body_buf = StringIO("") - rows = df.head(self.max_result).values if limit else df.values - index = df.index.values - for idx, row in zip(index, rows): - if show_index: - body_buf.write("%html {}".format(idx)) - body_buf.write("\t") - body_buf.write(str(row[0])) - for cell in row[1:]: - body_buf.write("\t") - body_buf.write(str(cell)) - body_buf.write("\n") - body_buf.seek(0); header_buf.seek(0) - #TODO(bzz): fix it, so it shows red notice, as in Spark - print("%table " + header_buf.read() + body_buf.read()) # + - # ("\nResults are limited by {}." \ - # .format(self.max_result) if limit else "") - #) - body_buf.close(); header_buf.close() - - def show_matplotlib(self, p, fmt="png", width="auto", height="auto", - **kwargs): - """Matplotlib show function - """ - if fmt == "png": - img = BytesIO() - p.savefig(img, format=fmt) - img_str = b"data:image/png;base64," - img_str += base64.b64encode(img.getvalue().strip()) - img_tag = "" - # Decoding is necessary for Python 3 compability - img_str = img_str.decode("ascii") - img_str = img_tag.format(img=img_str, width=width, height=height) - elif fmt == "svg": - img = StringIO() - p.savefig(img, format=fmt) - img_str = img.getvalue() - else: - raise ValueError("fmt must be 'png' or 'svg'") - - html = "%html
{img}
" - print(html.format(width=width, height=height, img=img_str)) - img.close() - - def configure_mpl(self, **kwargs): - import mpl_config - mpl_config.configure(**kwargs) - - def _setup_matplotlib(self): - # If we don't have matplotlib installed don't bother continuing - try: - import matplotlib - except ImportError: - return - # Make sure custom backends are available in the PYTHONPATH - rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) - mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') - if mpl_path not in sys.path: - sys.path.append(mpl_path) - - # Finally check if backend exists, and if so configure as appropriate - try: - matplotlib.use('module://backend_zinline') - import backend_zinline - - # Everything looks good so make config assuming that we are using - # an inline backend - self._displayhook = backend_zinline.displayhook - self.configure_mpl(width=600, height=400, dpi=72, - fontsize=10, interactive=True, format='png') - except ImportError: - # Fall back to Agg if no custom backend installed - matplotlib.use('Agg') - warnings.warn("Unable to load inline matplotlib backend, " - "falling back to Agg") - - -z = PyZeppelinContext() diff --git a/python/src/main/resources/bootstrap_input.py b/python/src/main/resources/bootstrap_input.py deleted file mode 100644 index 6a0c544484f..00000000000 --- a/python/src/main/resources/bootstrap_input.py +++ /dev/null @@ -1,58 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from py4j.java_gateway import JavaGateway -from py4j.java_gateway import java_import, JavaGateway, GatewayClient - - -client = GatewayClient(port=%PORT%) -gateway = JavaGateway(client) -java_import(gateway.jvm, "org.apache.zeppelin.display.Input") - - -class Py4jZeppelinContext(PyZeppelinContext): - """A context impl that uses Py4j to communicate to JVM - """ - def __init__(self, z): - PyZeppelinContext.__init__(self) - self.z = z - self.paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption - self.javaList = gateway.jvm.java.util.ArrayList - self.max_result = self.z.getMaxResult() - - def input(self, name, defaultValue=""): - return self.z.getGui().input(name, defaultValue) - - def select(self, name, options, defaultValue=""): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - return self.z.getGui().select(name, defaultValue, javaOptions) - - def checkbox(self, name, options, defaultChecked=[]): - javaOptions = gateway.new_array(self.paramOption, len(options)) - i = 0 - for tuple in options: - javaOptions[i] = self.paramOption(tuple[0], tuple[1]) - i += 1 - javaDefaultCheck = self.javaList() - for check in defaultChecked: - javaDefaultCheck.append(check) - return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions) - - -z = Py4jZeppelinContext(gateway.entry_point) diff --git a/python/src/main/resources/interpreter-setting.json b/python/src/main/resources/interpreter-setting.json index af0ba89c7db..bc4d4ec5280 100644 --- a/python/src/main/resources/interpreter-setting.json +++ b/python/src/main/resources/interpreter-setting.json @@ -39,7 +39,7 @@ "className": "org.apache.zeppelin.python.PythonCondaInterpreter", "properties": { }, - "editor":{ + "editor": { "language": "sh", "editOnDblClick": false } diff --git a/python/src/main/resources/bootstrap_sql.py b/python/src/main/resources/python/bootstrap_sql.py similarity index 85% rename from python/src/main/resources/bootstrap_sql.py rename to python/src/main/resources/python/bootstrap_sql.py index d8248c9d57c..6f1ae81f13b 100644 --- a/python/src/main/resources/bootstrap_sql.py +++ b/python/src/main/resources/python/bootstrap_sql.py @@ -21,8 +21,9 @@ from __future__ import print_function try: - from pandasql import sqldf - pysqldf = lambda q: sqldf(q, globals()) + from pandasql import sqldf + pysqldf = lambda q: sqldf(q, globals()) except ImportError: - pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" + - "Make sure 'pandas' and 'pandasql' libraries are installed") \ No newline at end of file + pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" + + "Make sure 'pandas' and 'pandasql' libraries are installed") + diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py new file mode 100644 index 00000000000..0a36cbafe94 --- /dev/null +++ b/python/src/main/resources/python/zeppelin_python.py @@ -0,0 +1,276 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os, sys, getopt, traceback, json, re + +from py4j.java_gateway import java_import, JavaGateway, GatewayClient +from py4j.protocol import Py4JJavaError, Py4JNetworkError +import warnings +import ast +import traceback +import warnings +import signal + +from io import BytesIO +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +# for back compatibility + +class Logger(object): + def __init__(self): + pass + + def write(self, message): + intp.appendOutput(message) + + def reset(self): + pass + + def flush(self): + pass + + +class PyZeppelinContext(object): + """ If py4j is detected, these class will be override + with the implementation in bootstrap_input.py + """ + errorMsg = "You must install py4j Python module " \ + "(pip install py4j) to use Zeppelin dynamic forms features" + + def __init__(self): + self.max_result = 1000 + self._displayhook = lambda *args: None + self._setup_matplotlib() + + def input(self, name, defaultValue=""): + print(self.errorMsg) + + def select(self, name, options, defaultValue=""): + print(self.errorMsg) + + def checkbox(self, name, options, defaultChecked=[]): + print(self.errorMsg) + + def show(self, p, **kwargs): + if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot": + self.show_matplotlib(p, **kwargs) + elif type(p).__name__ == "DataFrame": # does not play well with sub-classes + # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` + # and so a dependency on pandas + self.show_dataframe(p, **kwargs) + elif hasattr(p, '__call__'): + p() #error reporting + + def show_dataframe(self, df, show_index=False, **kwargs): + """Pretty prints DF using Table Display System + """ + limit = len(df) > self.max_result + header_buf = StringIO("") + if show_index: + idx_name = str(df.index.name) if df.index.name is not None else "" + header_buf.write(idx_name + "\t") + header_buf.write(str(df.columns[0])) + for col in df.columns[1:]: + header_buf.write("\t") + header_buf.write(str(col)) + header_buf.write("\n") + + body_buf = StringIO("") + rows = df.head(self.max_result).values if limit else df.values + index = df.index.values + for idx, row in zip(index, rows): + if show_index: + body_buf.write("%html {}".format(idx)) + body_buf.write("\t") + body_buf.write(str(row[0])) + for cell in row[1:]: + body_buf.write("\t") + body_buf.write(str(cell)) + body_buf.write("\n") + body_buf.seek(0); header_buf.seek(0) + #TODO(bzz): fix it, so it shows red notice, as in Spark + print("%table " + header_buf.read() + body_buf.read()) # + + # ("\nResults are limited by {}." \ + # .format(self.max_result) if limit else "") + #) + body_buf.close(); header_buf.close() + + def show_matplotlib(self, p, fmt="png", width="auto", height="auto", + **kwargs): + """Matplotlib show function + """ + if fmt == "png": + img = BytesIO() + p.savefig(img, format=fmt) + img_str = b"data:image/png;base64," + img_str += base64.b64encode(img.getvalue().strip()) + img_tag = "" + # Decoding is necessary for Python 3 compability + img_str = img_str.decode("ascii") + img_str = img_tag.format(img=img_str, width=width, height=height) + elif fmt == "svg": + img = StringIO() + p.savefig(img, format=fmt) + img_str = img.getvalue() + else: + raise ValueError("fmt must be 'png' or 'svg'") + + html = "%html
{img}
" + print(html.format(width=width, height=height, img=img_str)) + img.close() + + def configure_mpl(self, **kwargs): + import mpl_config + mpl_config.configure(**kwargs) + + def _setup_matplotlib(self): + # If we don't have matplotlib installed don't bother continuing + try: + import matplotlib + except ImportError: + return + # Make sure custom backends are available in the PYTHONPATH + rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) + mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') + if mpl_path not in sys.path: + sys.path.append(mpl_path) + + # Finally check if backend exists, and if so configure as appropriate + try: + matplotlib.use('module://backend_zinline') + import backend_zinline + + # Everything looks good so make config assuming that we are using + # an inline backend + self._displayhook = backend_zinline.displayhook + self.configure_mpl(width=600, height=400, dpi=72, + fontsize=10, interactive=True, format='png') + except ImportError: + # Fall back to Agg if no custom backend installed + matplotlib.use('Agg') + warnings.warn("Unable to load inline matplotlib backend, " + "falling back to Agg") + + +def handler_stop_signals(sig, frame): + sys.exit("Got signal : " + str(sig)) + + +signal.signal(signal.SIGINT, handler_stop_signals) + +host = "127.0.0.1" +if len(sys.argv) >= 3: + host = sys.argv[2] + +client = GatewayClient(address=host, port=int(sys.argv[1])) + +#gateway = JavaGateway(client, auto_convert = True) +gateway = JavaGateway(client) + +intp = gateway.entry_point +intp.onPythonScriptInitialized(os.getpid()) + +z = PyZeppelinContext() +z._setup_matplotlib() + +output = Logger() +sys.stdout = output +#sys.stderr = output + +while True : + req = intp.getStatements() + if req == None: + break + + try: + stmts = req.statements().split("\n") + final_code = [] + + # Get post-execute hooks + try: + global_hook = intp.getHook('post_exec_dev') + except: + global_hook = None + + try: + user_hook = z.getHook('post_exec') + except: + user_hook = None + + nhooks = 0 + for hook in (global_hook, user_hook): + if hook: + nhooks += 1 + + for s in stmts: + if s == None: + continue + + # skip comment + s_stripped = s.strip() + if len(s_stripped) == 0 or s_stripped.startswith("#"): + continue + + final_code.append(s) + + if final_code: + # use exec mode to compile the statements except the last statement, + # so that the last statement's evaluation will be printed to stdout + code = compile('\n'.join(final_code), '', 'exec', ast.PyCF_ONLY_AST, 1) + + to_run_hooks = [] + if (nhooks > 0): + to_run_hooks = code.body[-nhooks:] + + to_run_exec, to_run_single = (code.body[:-(nhooks + 1)], + [code.body[-(nhooks + 1)]]) + + try: + for node in to_run_exec: + mod = ast.Module([node]) + code = compile(mod, '', 'exec') + exec(code) + + for node in to_run_single: + mod = ast.Interactive([node]) + code = compile(mod, '', 'single') + exec(code) + + for node in to_run_hooks: + mod = ast.Module([node]) + code = compile(mod, '', 'exec') + exec(code) + except: + raise Exception(traceback.format_exc()) + + intp.setStatementsFinished("", False) + except Py4JJavaError: + excInnerError = traceback.format_exc() # format_tb() does not return the inner exception + innerErrorStart = excInnerError.find("Py4JJavaError:") + if innerErrorStart > -1: + excInnerError = excInnerError[innerErrorStart:] + intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True) + except Py4JNetworkError: + # lost connection from gateway server. exit + sys.exit(1) + except: + intp.setStatementsFinished(traceback.format_exc(), True) + + output.reset() diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java index c6d2a84886b..28d47e048ec 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java @@ -1,21 +1,23 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.python; + import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.user.AuthenticationInfo; @@ -132,4 +134,6 @@ private InterpreterContext getInterpreterContext() { null, new InterpreterOutput(null)); } + + } diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java index b4d3be293df..566b5e0b35a 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java @@ -21,8 +21,11 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.net.Inet4Address; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; import java.util.Properties; @@ -46,8 +49,12 @@ public void setUp() { group.put("note", Arrays.asList(python, docker)); python.setInterpreterGroup(group); docker.setInterpreterGroup(group); + doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString()); doReturn(python).when(docker).getPythonInterpreter(); + doReturn("/scriptpath/zeppelin_python.py").when(python).getScriptPath(); + + docker.open(); } @Test @@ -57,7 +64,7 @@ public void testActivateEnv() { verify(python, times(1)).open(); verify(python, times(1)).close(); verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString()); - verify(python).setPythonCommand("docker run -i --rm env python -iu"); + verify(python).setPythonCommand(Mockito.matches("docker run -i --rm -v.*")); } @Test diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java index 75604dc11df..8b48b24439d 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java @@ -1,25 +1,22 @@ - /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.python; -import java.util.*; - import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; @@ -29,34 +26,27 @@ import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; -/** - * In order for this test to work, test env must have installed: - *
    - * -
  1. Python
  2. - * -
  3. Matplotlib
  4. - *
      - * - * Your PYTHONPATH should also contain the directory of the Matplotlib - * backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python. - * - * To run manually on such environment, use: - * - * mvn -Dpython.test.exclude='' test -pl python -am - * - */ -public class PythonInterpreterMatplotlibTest { +import static org.junit.Assert.*; +public class PythonInterpreterMatplotlibTest implements InterpreterOutputListener { private InterpreterGroup intpGroup; private PythonInterpreter python; private InterpreterContext context; + InterpreterOutput out; @Before public void setUp() throws Exception { @@ -68,16 +58,27 @@ public void setUp() throws Exception { python = new PythonInterpreter(p); python.setInterpreterGroup(intpGroup); - python.open(); List interpreters = new LinkedList<>(); interpreters.add(python); intpGroup.put("note", interpreters); - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, - new LinkedList(), new InterpreterOutput(null)); + out = new InterpreterOutput(this); + + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList(), + out); + python.open(); + } + + @After + public void afterTest() throws IOException { + python.close(); } @Test @@ -85,14 +86,14 @@ public void dependenciesAreInstalled() { // matplotlib InterpreterResult ret = python.interpret("import matplotlib", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - + // inline backend ret = python.interpret("import backend_zinline", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); } @Test - public void showPlot() { + public void showPlot() throws IOException { // Simple plot test InterpreterResult ret; ret = python.interpret("import matplotlib.pyplot as plt", context); @@ -100,15 +101,16 @@ public void showPlot() { ret = python.interpret("plt.plot([1, 2, 3])", context); ret = python.interpret("plt.show()", context); - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType()); - assertTrue(ret.message().get(0).getData().contains("data:image/png;base64")); - assertTrue(ret.message().get(0).getData().contains("
      ")); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Type.TEXT, out.getOutputAt(0).getType()); + assertEquals(new String(out.getOutputAt(1).toByteArray()), InterpreterResult.Type.HTML, out.getOutputAt(1).getType()); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("data:image/png;base64")); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("
      ")); } @Test // Test for when configuration is set to auto-close figures after show(). - public void testClose() { + public void testClose() throws IOException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -116,25 +118,33 @@ public void testClose() { ret = python.interpret("z.configure_mpl(interactive=False)", context); ret = python.interpret("plt.plot([1, 2, 3])", context); ret1 = python.interpret("plt.show()", context); - + // Second call to show() should print nothing, and Type should be TEXT. // This is because when close=True, there should be no living instances // of FigureManager, causing show() to return before setting the output // type to HTML. ret = python.interpret("plt.show()", context); + + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(0, ret.message().size()); - + // Now test that new plot is drawn. It should be identical to the // previous one. ret = python.interpret("plt.plot([1, 2, 3])", context); + String msg1 = new String(out.getOutputAt(0).toByteArray()); + InterpreterResult.Type type1 = out.getOutputAt(0).getType(); + ret2 = python.interpret("plt.show()", context); - assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType()); - assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData()); + String msg2 = new String(out.getOutputAt(0).toByteArray()); + InterpreterResult.Type type2 = out.getOutputAt(0).getType(); + + assertEquals(msg1, msg2); + assertEquals(type1, type2); } - + @Test // Test for when configuration is set to not auto-close figures after show(). - public void testNoClose() { + public void testNoClose() throws IOException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -142,19 +152,39 @@ public void testNoClose() { ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context); ret = python.interpret("plt.plot([1, 2, 3])", context); ret1 = python.interpret("plt.show()", context); - + // Second call to show() should print nothing, and Type should be HTML. // This is because when close=False, there should be living instances // of FigureManager, causing show() to set the output // type to HTML even though the figure is inactive. ret = python.interpret("plt.show()", context); - assertEquals("", ret.message().get(0).getData()); - + String msg1 = new String(out.getOutputAt(0).toByteArray()); + assertNotSame("", msg1); + // Now test that plot can be reshown if it is updated. It should be // different from the previous one because it will plot the same line // again but in a different color. ret = python.interpret("plt.plot([1, 2, 3])", context); + msg1 = new String(out.getOutputAt(1).toByteArray()); ret2 = python.interpret("plt.show()", context); - assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData()); + String msg2 = new String(out.getOutputAt(1).toByteArray()); + + assertNotSame(msg1, msg2); + } + + + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + } } diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java index 86fb22bc322..f200a0a9429 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java @@ -21,13 +21,16 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -35,7 +38,10 @@ import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,13 +59,14 @@ * mvn -Dpython.test.exclude='' test -pl python -am * */ -public class PythonInterpreterPandasSqlTest { +public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener { private InterpreterGroup intpGroup; private PythonInterpreterPandasSql sql; private PythonInterpreter python; private InterpreterContext context; + InterpreterOutput out; @Before public void setUp() throws Exception { @@ -78,14 +85,27 @@ public void setUp() throws Exception { intpGroup.put("note", Arrays.asList(python, sql)); - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), null, - new LinkedList(), new InterpreterOutput(null)); + out = new InterpreterOutput(this); + + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList(), + out); + + // to make sure python is running. + InterpreterResult ret = python.interpret("\n", context); + assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - //important to be last step sql.open(); - //it depends on python interpreter presence in the same group + } + + @After + public void afterTest() throws IOException { + sql.close(); } @Test @@ -97,23 +117,15 @@ public void dependenciesAreInstalled() { @Test public void errorMessageIfDependenciesNotInstalled() { InterpreterResult ret; - // given - ret = python.interpret( - "pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')", - context); - assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); - - // when ret = sql.interpret("SELECT * from something", context); - // then assertNotNull(ret); - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertTrue(ret.message().get(0).getData().contains("dependency is not installed")); + assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.ERROR, ret.code()); + assertTrue(ret.message().get(0).getData().contains("no such table: something")); } @Test - public void sqlOverTestDataPrintsTable() { + public void sqlOverTestDataPrintsTable() throws IOException { InterpreterResult ret; // given //String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34"; @@ -121,36 +133,34 @@ public void sqlOverTestDataPrintsTable() { ret = python.interpret("import numpy as np", context); // DataFrame df2 \w test data ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+ - "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context); + "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); //when ret = sql.interpret("select name, age from df2 where age < 40", context); //then - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType()); - //assertEquals(expectedTable, ret.message()); //somehow it's same but not equal - assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0); - assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); + assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0); + assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0); assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code()); } @Test - public void badSqlSyntaxFails() { + public void badSqlSyntaxFails() throws IOException { //when InterpreterResult ret = sql.interpret("select wrong syntax", context); //then assertNotNull("Interpreter returned 'null'", ret); - //System.out.println("\nInterpreter response: \n" + ret.message()); assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); + assertTrue(out.toInterpreterResultMessage().size() == 0); } @Test - public void showDataFrame() { + public void showDataFrame() throws IOException { InterpreterResult ret; ret = python.interpret("import pandas as pd", context); ret = python.interpret("import numpy as np", context); @@ -165,11 +175,25 @@ public void showDataFrame() { ret = python.interpret("z.show(df1, show_index=True)", context); // then - assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType()); - assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0); - assertTrue(ret.message().get(0).getData().indexOf("13") > 0); - assertTrue(ret.message().get(0).getData().indexOf("nan") > 0); - assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0); + assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("index_name")); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan")); + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7")); + } + + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + } -} +} \ No newline at end of file diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index bf50c232581..b5cd680d8da 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.python; @@ -21,52 +21,39 @@ import static org.apache.zeppelin.python.PythonInterpreter.MAX_RESULT; import static org.apache.zeppelin.python.PythonInterpreter.ZEPPELIN_PYTHON; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.apache.zeppelin.interpreter.ClassloaderInterpreter; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Python interpreter unit test - * - * Important: ALL tests here DO NOT REQUIRE Python to be installed - * If Python dependency is required, please look at PythonInterpreterWithPythonInstalledTest - */ -public class PythonInterpreterTest { - private static final Logger LOG = LoggerFactory.getLogger(PythonProcess.class); - PythonInterpreter zeppelinPythonInterpreter = null; +public class PythonInterpreterTest implements InterpreterOutputListener { PythonInterpreter pythonInterpreter = null; - PythonProcess mockPythonProcess; String cmdHistory; + private InterpreterContext context; + InterpreterOutput out; public static Properties getPythonTestProperties() { Properties p = new Properties(); @@ -79,18 +66,8 @@ public static Properties getPythonTestProperties() { public void beforeTest() throws IOException { cmdHistory = ""; - /*Mock python process*/ - mockPythonProcess = mock(PythonProcess.class); - when(mockPythonProcess.getPid()).thenReturn(1L); - when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer(new Answer() { - @Override public String answer(InvocationOnMock invocationOnMock) throws Throwable { - return answerFromPythonMock(invocationOnMock); - } - }); - // python interpreter - pythonInterpreter = spy(new PythonInterpreter(getPythonTestProperties())); - zeppelinPythonInterpreter = new PythonInterpreter(getPythonTestProperties()); + pythonInterpreter = new PythonInterpreter(getPythonTestProperties()); // create interpreter group InterpreterGroup group = new InterpreterGroup(); @@ -98,186 +75,49 @@ public void beforeTest() throws IOException { group.get("note").add(pythonInterpreter); pythonInterpreter.setInterpreterGroup(group); - when(pythonInterpreter.getPythonProcess()).thenReturn(mockPythonProcess); - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError"); - } - - @After - public void afterTest() throws IOException { - pythonInterpreter.close(); - zeppelinPythonInterpreter.close(); - } - - @Test - public void testOpenInterpreter() { - pythonInterpreter.open(); - assertEquals(pythonInterpreter.getPythonProcess().getPid(), 1); - } - - /** - * If Py4J is not installed, bootstrap_input.py - * is not sent to Python process and py4j JavaGateway is not running - */ - @Test - public void testPy4jIsNotInstalled() { - pythonInterpreter.open(); - assertNull(pythonInterpreter.getPy4jPort()); - assertTrue(cmdHistory.contains("def help()")); - assertTrue(cmdHistory.contains("class PyZeppelinContext(object):")); - assertTrue(cmdHistory.contains("z = PyZeppelinContext")); - assertTrue(cmdHistory.contains("def show")); - assertFalse(cmdHistory.contains("GatewayClient")); - } - - /** - * If Py4J installed, bootstrap_input.py - * is sent to interpreter and JavaGateway is running - */ - @Test - public void testPy4jInstalled() throws IOException, InterruptedException { - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(""); + out = new InterpreterOutput(this); + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap(), + new GUI(), + new AngularObjectRegistry(group.getId(), null), + new LocalResourcePool("id"), + new LinkedList(), + out); pythonInterpreter.open(); - Integer py4jPort = pythonInterpreter.getPy4jPort(); - assertNotNull(py4jPort); - - assertTrue(cmdHistory.contains("def help()")); - assertTrue(cmdHistory.contains("class PyZeppelinContext(object):")); - assertTrue(cmdHistory.contains("z = Py4jZeppelinContext")); - assertTrue(cmdHistory.contains("def show")); - assertTrue(cmdHistory.contains("GatewayClient(port=" + py4jPort + ")")); - assertTrue(cmdHistory.contains("org.apache.zeppelin.display.Input")); - - assertTrue(serverIsListeningOn(py4jPort)); - pythonInterpreter.close(); - TimeUnit.MILLISECONDS.sleep(100); - assertFalse(serverIsListeningOn(py4jPort)); } - @Test - public void testClose() throws IOException, InterruptedException { - //given: py4j is installed - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(""); - - pythonInterpreter.open(); - Integer py4jPort = pythonInterpreter.getPy4jPort(); - assertNotNull(py4jPort); - - //when + @After + public void afterTest() throws IOException { pythonInterpreter.close(); - TimeUnit.MILLISECONDS.sleep(100); - - //then - assertFalse(serverIsListeningOn(py4jPort)); - verify(mockPythonProcess, times(1)).close(); } @Test - public void testInterpret() { - pythonInterpreter.open(); - cmdHistory = ""; - InterpreterResult result = pythonInterpreter.interpret("print a", null); + public void testInterpret() throws InterruptedException, IOException { + InterpreterResult result = pythonInterpreter.interpret("print (\"hi\")", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals("%text print a", result.message().get(0).toString()); } @Test - public void testInterpretInvalidSyntax() { - zeppelinPythonInterpreter.open(); - InterpreterResult result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n\nz._displayhook()", null); + public void testInterpretInvalidSyntax() throws IOException { + InterpreterResult result = pythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(result.message().get(0).toString().contains("hi\nhi\nhi")); - - result = zeppelinPythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\nz._displayhook()", null); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(result.message().get(0).toString().contains("SyntaxError: invalid syntax")); - } - - /** - * Checks if given port is open on 'localhost' - * @param port - */ - private boolean serverIsListeningOn(Integer port) { - Socket s = new Socket(); - boolean serverIsListening = false; + assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi")); + } - int retryCount = 0; - boolean connected = false; - while (connected = tryToConnect(s, port) && retryCount < 10) { - serverIsListening = connected; - tryToClose(s); - retryCount++; - s = new Socket(); - } - return serverIsListening; - } - - private boolean tryToConnect(Socket s, Integer port) { - boolean connected = false; - SocketAddress sa = new InetSocketAddress("localhost", port); - try { - s.connect(sa, 10000); - connected = true; - } catch (IOException e) { - //LOG.warn("Can't open connection to " + sa, e); - } - return connected; - } + @Override + public void onUpdateAll(InterpreterOutput out) { - private void tryToClose(Socket s) { - try { - s.close(); - } catch (IOException e) { - LOG.error("Can't close connection to " + s.getInetAddress(), e); - } } - private String answerFromPythonMock(InvocationOnMock invocationOnMock) { - Object[] inputs = invocationOnMock.getArguments(); - String cmdToExecute = (String) inputs[0]; + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { - if (cmdToExecute != null) { - cmdHistory += cmdToExecute; - String[] lines = cmdToExecute.split("\\n"); - String output = ""; - - for (int i = 0; i < lines.length; i++) { - output += lines[i]; - } - return output; - } else { - return ""; - } } - @Test - public void checkMultiRowErrorFails() { - - PythonInterpreter pythonInterpreter = new PythonInterpreter( - PythonInterpreterTest.getPythonTestProperties() - ); - // create interpreter group - InterpreterGroup group = new InterpreterGroup(); - group.put("note", new LinkedList()); - group.get("note").add(pythonInterpreter); - pythonInterpreter.setInterpreterGroup(group); - - pythonInterpreter.open(); - - String codeRaiseException = "raise Exception(\"test exception\")"; - InterpreterResult ret = pythonInterpreter.interpret(codeRaiseException, null); + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { - assertNotNull("Interpreter result for raise exception is Null", ret); - - System.err.println("ret = '" + ret + "'"); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); - - assertNotNull("Interpreter result for text is Null", ret); - String codePrintText = "print (\"Exception(\\\"test exception\\\")\")"; - ret = pythonInterpreter.interpret(codePrintText, null); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); } - } diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java deleted file mode 100644 index 7b889ad724b..00000000000 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.zeppelin.python; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.junit.Test; - -import java.util.Arrays; - -/** - * Python interpreter unit test that user real Python - * - * Important: ALL tests here REQUIRE Python to be installed - * They are excluded from default build, to run them manually do: - * - * - * mvn "-Dtest=org.apache.zeppelin.python.PythonInterpreterWithPythonInstalledTest" test -pl python - * - * - * or - * - * mvn -Dpython.test.exclude='' test -pl python -am - * - */ -public class PythonInterpreterWithPythonInstalledTest { - - @Test - public void badPythonSyntaxFails() { - //given - PythonInterpreter realPython = new PythonInterpreter( - PythonInterpreterTest.getPythonTestProperties()); - // create interpreter group - InterpreterGroup group = new InterpreterGroup(); - group.put("note", Arrays.asList((Interpreter) realPython)); - realPython.setInterpreterGroup(group); - - realPython.open(); - - //when - InterpreterResult ret = realPython.interpret("select wrong syntax", null); - - //then - assertNotNull("Interpreter returned 'null'", ret); - //System.out.println("\nInterpreter response: \n" + ret.message()); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); - - realPython.close(); - } - - @Test - public void goodPythonSyntaxRuns() { - //given - PythonInterpreter realPython = new PythonInterpreter( - PythonInterpreterTest.getPythonTestProperties()); - InterpreterGroup group = new InterpreterGroup(); - group.put("note", Arrays.asList((Interpreter) realPython)); - realPython.setInterpreterGroup(group); - realPython.open(); - - //when - InterpreterResult ret = realPython.interpret("help()", null); - - //then - assertNotNull("Interpreter returned 'null'", ret); - //System.out.println("\nInterpreter response: \n" + ret.message()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertTrue(ret.message().get(0).getData().length() > 0); - - realPython.close(); - } - - @Test - public void testZeppelin1555() { - //given - PythonInterpreter realPython = new PythonInterpreter( - PythonInterpreterTest.getPythonTestProperties()); - InterpreterGroup group = new InterpreterGroup(); - group.put("note", Arrays.asList((Interpreter) realPython)); - realPython.setInterpreterGroup(group); - realPython.open(); - - //when - InterpreterResult ret1 = realPython.interpret("print(\"...\")", null); - - //then - //System.out.println("\nInterpreter response: \n" + ret.message()); - assertEquals(InterpreterResult.Code.SUCCESS, ret1.code()); - assertEquals("...\n", ret1.message().get(0).getData()); - - - InterpreterResult ret2 = realPython.interpret("for i in range(5):", null); - //then - //System.out.println("\nInterpreterResultterpreter response: \n" + ret2.message()); - assertEquals(InterpreterResult.Code.ERROR, ret2.code()); - assertEquals(" File \"\", line 2\n" + - " \n" + - " ^\n" + - "IndentationError: expected an indented block\n", ret2.message().get(0).getData()); - - realPython.close(); - } - -}