Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;

import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Helps run python interpreter on a docker container
*/
public class PythonDockerInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
private File zeppelinHome;

public PythonDockerInterpreter(Properties property) {
super(property);
}

@Override
public void open() {
if (System.getenv("ZEPPELIN_HOME") != null) {
zeppelinHome = new File(System.getenv("ZEPPELIN_HOME"));
} else {
zeppelinHome = Paths.get("..").toAbsolutePath().toFile();
}
}

@Override
public void close() {

}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Leemoonsoo Can you check this method? I think you miss actual running part of executing code. you'd check only "st == null", "activate", "deactivate" except "running code"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PythonDockerInterpreter does not run any python code. It just replace python command for PythonInterpreter from 'python' to 'docker -i run ... 'python'.

So I think this method is okay.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But... in case you try to run "print(1)", that method always return error, isn't it?

File pythonScript = new File(getPythonInterpreter().getScriptPath());
InterpreterOutput out = context.out;

Matcher activateMatcher = activatePattern.matcher(st);
Matcher deactivateMatcher = deactivatePattern.matcher(st);
Matcher helpMatcher = helpPattern.matcher(st);

if (st == null || st.isEmpty() || helpMatcher.matches()) {
printUsage(out);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);

// mount pythonscript dir
String mountPythonScript = "-v " +
pythonScript.getParentFile().getAbsolutePath() +
":/_zeppelin_tmp ";

// mount zeppelin dir
String mountPy4j = "-v " +
zeppelinHome.getAbsolutePath() +
":/_zeppelin ";

// set PYTHONPATH
String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" +
":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS;

setPythonCommand("docker run -i --rm " +
mountPythonScript +
mountPy4j +
"-e PYTHONPATH=\"" + pythonPath + "\" " +
image +
" python /_zeppelin_tmp/" + pythonScript.getName());
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
} else if (deactivateMatcher.matches()) {
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
}
}


public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}

private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/docker_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
}

@Override
public void cancel(InterpreterContext context) {

}

@Override
public FormType getFormType() {
return FormType.NONE;
}

@Override
public int getProgress(InterpreterContext context) {
return 0;
}

/**
* Use python interpreter's scheduler.
* To make sure %python.docker paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}

private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}

protected PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());

while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;

if (lazy != null) {
lazy.open();
}
return python;
}

public boolean pull(InterpreterOutput out, String image) {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
throw new InterpreterException(e);
}
return exit == 0;
}

protected int runCommand(InterpreterOutput out, String... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
out.write(line + "\n");
}
int r = process.waitFor(); // Let the process finish.
return r;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
Expand Down Expand Up @@ -59,6 +57,7 @@
import org.slf4j.LoggerFactory;

import py4j.GatewayServer;
import py4j.commands.Command;

/**
* Python interpreter for Zeppelin.
Expand All @@ -78,7 +77,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
private String py4jLibPath;
private String pythonLibPath;

private String pythonCommand = DEFAULT_ZEPPELIN_PYTHON;
private String pythonCommand;

private GatewayServer gatewayServer;
private DefaultExecutor executor;
Expand All @@ -95,11 +94,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl

Integer statementSetNotifier = new Integer(0);


public PythonInterpreter(Properties property) {
super(property);
try {
File scriptFile = File.createTempFile("zeppelin_python-", ".py");
File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp"));
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
Expand Down Expand Up @@ -128,6 +126,10 @@ private void createPythonScript() {
logger.info("File {} created", scriptPath);
}

public String getScriptPath() {
return scriptPath;
}

private void copyFile(File out, String sourceFile) {
ClassLoader classLoader = getClass().getClassLoader();
try {
Expand All @@ -141,7 +143,7 @@ private void copyFile(File out, String sourceFile) {
}
}

private void createGatewayServerAndStartScript() {
private void createGatewayServerAndStartScript() throws UnknownHostException {
createPythonScript();
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH;
Expand All @@ -153,13 +155,28 @@ private void createGatewayServerAndStartScript() {
}

port = findRandomOpenPortOnAllLocalInterfaces();
gatewayServer = new GatewayServer(this, port);
gatewayServer = new GatewayServer(this,
port,
GatewayServer.DEFAULT_PYTHON_PORT,
InetAddress.getByName("0.0.0.0"),
InetAddress.getByName("0.0.0.0"),
GatewayServer.DEFAULT_CONNECT_TIMEOUT,
GatewayServer.DEFAULT_READ_TIMEOUT,
(List) null);

gatewayServer.start();

// Run python shell
CommandLine cmd = CommandLine.parse(getPythonCommand());
cmd.addArgument(scriptPath, false);
String pythonCmd = getPythonCommand();
CommandLine cmd = CommandLine.parse(pythonCmd);

if (!pythonCmd.endsWith(".py")) {
// PythonDockerInterpreter set pythoncmd with script
cmd.addArgument(getScriptPath(), false);
}
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(getLocalIp(), false);

executor = new DefaultExecutor();
outputStream = new InterpreterOutputStream(logger);
PipedOutputStream ps = new PipedOutputStream();
Expand All @@ -185,6 +202,7 @@ private void createGatewayServerAndStartScript() {
py4jLibPath + File.pathSeparator + pythonLibPath);
}

logger.info("cmd = {}", cmd.toString());
executor.execute(cmd, env, this);
pythonscriptRunning = true;
} catch (IOException e) {
Expand All @@ -207,7 +225,11 @@ public void open() {
registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
}
// Add matplotlib display hook
createGatewayServerAndStartScript();
try {
createGatewayServerAndStartScript();
} catch (UnknownHostException e) {
throw new InterpreterException(e);
}
}

@Override
Expand Down Expand Up @@ -244,25 +266,18 @@ public void close() {
*/
public class PythonInterpretRequest {
public String statements;
public String jobGroup;

public PythonInterpretRequest(String statements, String jobGroup) {
public PythonInterpretRequest(String statements) {
this.statements = statements;
this.jobGroup = jobGroup;
}

public String statements() {
return statements;
}

public String jobGroup() {
return jobGroup;
}
}

public PythonInterpretRequest getStatements() {
synchronized (statementSetNotifier) {

while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) {
try {
statementSetNotifier.wait(1000);
Expand Down Expand Up @@ -350,7 +365,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
return new InterpreterResult(Code.ERROR, errorMessage);
}

pythonInterpretRequest = new PythonInterpretRequest(cmd, null);
pythonInterpretRequest = new PythonInterpretRequest(cmd);
statementOutput = null;

synchronized (statementSetNotifier) {
Expand Down Expand Up @@ -420,16 +435,17 @@ public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}

public void setPythonPath(String pythonPath) {
this.pythonPath = pythonPath;
}

public void setPythonCommand(String cmd) {
logger.info("Set Python Command : {}", cmd);
pythonCommand = cmd;
}

public String getPythonCommand() {
return pythonCommand;
if (pythonCommand == null) {
return DEFAULT_ZEPPELIN_PYTHON;
} else {
return pythonCommand;
}
}

private Job getRunningJob(String paragraphId) {
Expand Down Expand Up @@ -462,8 +478,14 @@ public GUI getGui() {
return context.getGui();
}

public Integer getPy4jPort() {
return port;
String getLocalIp() {
try {
return Inet4Address.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.error("can't get local IP", e);
}
// fall back to loopback addreess
return "127.0.0.1";
}

private int findRandomOpenPortOnAllLocalInterfaces() {
Expand Down
Loading