diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 22818fce872..cd77dc42cf0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -57,7 +57,6 @@ public class RemoteInterpreter extends Interpreter { static Map interpreterGroupReference = new HashMap(); - private InterpreterContextRunnerPool interpreterContextRunnerPool; private int connectTimeout; public RemoteInterpreter(Properties property, @@ -72,7 +71,6 @@ public RemoteInterpreter(Properties property, this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; env = new HashMap(); - interpreterContextRunnerPool = new InterpreterContextRunnerPool(); this.connectTimeout = connectTimeout; } @@ -195,6 +193,9 @@ public InterpreterResult interpret(String st, InterpreterContext context) { throw new InterpreterException(e1); } + InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess + .getInterpreterContextRunnerPool(); + List runners = context.getRunners(); if (runners != null && runners.size() != 0) { // assume all runners in this InterpreterContext have the same note id @@ -338,7 +339,7 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { || (!intpProcess.isRunning() && intpProcess.getPort() == -1)) { interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env, interpreterContextRunnerPool, connectTimeout)); + interpreterPath, env, connectTimeout)); logger.info("setInterpreterGroup = " + getInterpreterGroupKey(interpreterGroup) + " class=" + className diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 5dd2a653982..f917eb950f9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -55,21 +55,19 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { public RemoteInterpreterProcess(String intpRunner, String intpDir, Map env, - InterpreterContextRunnerPool interpreterContextRunnerPool, int connectTimeout) { - this(intpRunner, intpDir, env, interpreterContextRunnerPool, - new RemoteInterpreterEventPoller(), connectTimeout); + int connectTimeout) { + this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout); } RemoteInterpreterProcess(String intpRunner, String intpDir, Map env, - InterpreterContextRunnerPool interpreterContextRunnerPool, RemoteInterpreterEventPoller remoteInterpreterEventPoller, int connectTimeout) { this.interpreterRunner = intpRunner; this.interpreterDir = intpDir; this.env = env; - this.interpreterContextRunnerPool = interpreterContextRunnerPool; + this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); referenceCount = new AtomicInteger(0); this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; this.connectTimeout = connectTimeout; diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index 004327292c5..ea5397ed6d9 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -32,8 +32,9 @@ public class RemoteInterpreterProcessTest { @Test public void testStartStop() { InterpreterGroup intpGroup = new InterpreterGroup(); - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap(), - new InterpreterContextRunnerPool(), 10 * 1000); + RemoteInterpreterProcess rip = new RemoteInterpreterProcess( + "../bin/interpreter.sh", "nonexists", new HashMap(), + 10 * 1000); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); assertEquals(1, rip.reference(intpGroup)); @@ -48,8 +49,9 @@ public void testStartStop() { @Test public void testClientFactory() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap(), - new InterpreterContextRunnerPool(), mock(RemoteInterpreterEventPoller.class), 10 * 1000); + RemoteInterpreterProcess rip = new RemoteInterpreterProcess( + "../bin/interpreter.sh", "nonexists", new HashMap(), + mock(RemoteInterpreterEventPoller.class), 10 * 1000); rip.reference(intpGroup); assertEquals(0, rip.getNumActiveClient()); assertEquals(0, rip.getNumIdleClient()); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 02b7e47dfa3..758a1e4b813 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -89,6 +89,27 @@ public void pySparkTest() throws IOException { ZeppelinServer.notebook.removeNote(note.id()); } + @Test + public void zRunTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(); + Paragraph p0 = note.addParagraph(); + p0.setText("z.run(1)"); + Paragraph p1 = note.addParagraph(); + p1.setText("val a=10"); + Paragraph p2 = note.addParagraph(); + p2.setText("print(a)"); + + note.run(p0.getId()); + waitForFinish(p0); + + note.run(p2.getId()); + waitForFinish(p2); + assertEquals("10", p2.getResult().message()); + + ZeppelinServer.notebook.removeNote(note.id()); + } + /** * Get spark version number as a numerical value. * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...