diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 216663a7590..9256bcdb619 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; /** * InterpreterGroup is list of interpreters in the same group. @@ -33,6 +34,7 @@ public class InterpreterGroup extends LinkedList{ String id; AngularObjectRegistry angularObjectRegistry; + RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process public InterpreterGroup(String id) { this.id = id; @@ -72,6 +74,14 @@ public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry this.angularObjectRegistry = angularObjectRegistry; } + public RemoteInterpreterProcess getRemoteInterpreterProcess() { + return remoteInterpreterProcess; + } + + public void setRemoteInterpreterProcess(RemoteInterpreterProcess remoteInterpreterProcess) { + this.remoteInterpreterProcess = remoteInterpreterProcess; + } + public void close() { List closeThreads = new LinkedList(); @@ -118,5 +128,12 @@ public void run() { logger.error("Can't close interpreter", e); } } + + // make sure remote interpreter process terminates + if (remoteInterpreterProcess != null) { + while (remoteInterpreterProcess.referenceCount() > 0) { + remoteInterpreterProcess.dereference(); + } + } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 3ac5121580d..c72aa7cef28 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -56,8 +56,6 @@ public class RemoteInterpreter extends Interpreter { FormType formType; boolean initialized; private Map env; - static Map interpreterGroupReference - = new HashMap(); private int connectTimeout; @@ -96,19 +94,21 @@ public String getClassName() { } public RemoteInterpreterProcess getInterpreterProcess() { - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - RemoteInterpreterProcess interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - try { - return interpreterProcess; - } catch (Exception e) { - throw new InterpreterException(e); - } - } else { - // closed or not opened yet - return null; + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) { + return null; + } + + synchronized (intpGroup) { + if (intpGroup.getRemoteInterpreterProcess() == null) { + // create new remote process + RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess( + interpreterRunner, interpreterPath, env, connectTimeout); + + intpGroup.setRemoteInterpreterProcess(remoteProcess); } + + return intpGroup.getRemoteInterpreterProcess(); } } @@ -117,17 +117,7 @@ private synchronized void init() { return; } - RemoteInterpreterProcess interpreterProcess = null; - - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - } else { - throw new InterpreterException("Unexpected error"); - } - } - + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); int rc = interpreterProcess.reference(getInterpreterGroup()); synchronized (interpreterProcess) { @@ -170,24 +160,14 @@ public void close() { Client client = null; try { client = interpreterProcess.getClient(); + client.close(className); } catch (Exception e1) { throw new InterpreterException(e1); - } - - try { - client.close(className); - } catch (TException e) { - throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); - } - - int r = interpreterProcess.dereference(); - if (r == 0) { - synchronized (interpreterGroupReference) { - InterpreterGroup intpGroup = getInterpreterGroup(); - interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup)); + if (client != null) { + interpreterProcess.releaseClient(client); } + getInterpreterProcess().dereference(); } } @@ -339,29 +319,6 @@ public Scheduler getScheduler() { } } - - @Override - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - super.setInterpreterGroup(interpreterGroup); - - synchronized (interpreterGroupReference) { - RemoteInterpreterProcess intpProcess = interpreterGroupReference - .get(getInterpreterGroupKey(interpreterGroup)); - - // when interpreter process is not created or terminated - if (intpProcess == null || (!intpProcess.isRunning() && intpProcess.getPort() > 0) - || (!intpProcess.isRunning() && intpProcess.getPort() == -1)) { - interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), - new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env, connectTimeout)); - - logger.info("setInterpreterGroup = " - + getInterpreterGroupKey(interpreterGroup) + " class=" + className - + ", path=" + interpreterPath); - } - } - } - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { return interpreterGroup.getId(); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index bbda252ed21..c938ff36622 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -219,9 +219,6 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException intpA.close(); intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertNull(process); } @Test @@ -337,9 +334,6 @@ protected boolean jobAbort() { intpA.close(); intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertNull(process); } @Test