diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index b7ac014c2e8..a7ddf491b1d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -77,15 +78,19 @@ public AngularObject addAndNotifyRemoteProcess(String name, Object o, String not } Client client = null; + boolean broken = false; try { client = remoteInterpreterProcess.getClient(); client.angularObjectAdd(name, noteId, gson.toJson(o)); return super.add(name, o, noteId, true); + } catch (TException e) { + broken = true; + logger.error("Error", e); } catch (Exception e) { logger.error("Error", e); } finally { if (client != null) { - remoteInterpreterProcess.releaseClient(client); + remoteInterpreterProcess.releaseClient(client, broken); } } return null; @@ -106,15 +111,19 @@ public AngularObject removeAndNotifyRemoteProcess(String name, String noteId) { } Client client = null; + boolean broken = false; try { client = remoteInterpreterProcess.getClient(); client.angularObjectRemove(name, noteId); return super.remove(name, noteId); + } catch (TException e) { + broken = true; + logger.error("Error", e); } catch (Exception e) { logger.error("Error", e); } finally { if (client != null) { - remoteInterpreterProcess.releaseClient(client); + remoteInterpreterProcess.releaseClient(client, broken); } } return null; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index c72aa7cef28..455156ce113 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -131,6 +131,7 @@ private synchronized void init() { throw new InterpreterException(e1); } + boolean broken = false; try { for (Interpreter intp : this.getInterpreterGroup()) { logger.info("Create remote interpreter {}", intp.getClassName()); @@ -138,9 +139,10 @@ private synchronized void init() { } } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } } @@ -158,14 +160,19 @@ public void open() { public void close() { RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); Client client = null; + + boolean broken = false; try { client = interpreterProcess.getClient(); client.close(className); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); } catch (Exception e1) { throw new InterpreterException(e1); } finally { if (client != null) { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } getInterpreterProcess().dereference(); } @@ -195,6 +202,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { interpreterContextRunnerPool.addAll(noteId, runners); } + boolean broken = false; try { GUI settings = context.getGui(); RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context)); @@ -215,9 +223,10 @@ public InterpreterResult interpret(String st, InterpreterContext context) { InterpreterResult result = convert(remoteResult); return result; } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -231,12 +240,14 @@ public void cancel(InterpreterContext context) { throw new InterpreterException(e1); } + boolean broken = false; try { client.cancel(className, convert(context)); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -257,13 +268,15 @@ public FormType getFormType() { throw new InterpreterException(e1); } + boolean broken = false; try { formType = FormType.valueOf(client.getFormType(className)); return formType; } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -277,12 +290,14 @@ public int getProgress(InterpreterContext context) { throw new InterpreterException(e1); } + boolean broken = false; try { return client.getProgress(className, convert(context)); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -297,12 +312,14 @@ public List completion(String buf, int cursor) { throw new InterpreterException(e1); } + boolean broken = false; try { return client.completion(className, buf, cursor); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 1b734b73202..d08d43eb41d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -66,16 +66,18 @@ public void run() { } RemoteInterpreterEvent event = null; + boolean broken = false; try { event = client.getEvent(); } catch (TException e) { + broken = true; logger.error("Can't get RemoteInterpreterEvent", e); waitQuietly(); continue; + } finally { + interpreterProcess.releaseClient(client, broken); } - interpreterProcess.releaseClient(client); - Gson gson = new Gson(); AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 0c9e877e4ea..8d96f4c6677 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -140,7 +140,27 @@ public Client getClient() throws Exception { } public void releaseClient(Client client) { - clientPool.returnObject(client); + releaseClient(client, false); + } + + public void releaseClient(Client client, boolean broken) { + if (broken) { + releaseBrokenClient(client); + } else { + try { + clientPool.returnObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } + } + } + + public void releaseBrokenClient(Client client) { + try { + clientPool.invalidateObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } } public int dereference() { @@ -159,7 +179,8 @@ public int dereference() { // safely ignore exception while client.shutdown() may terminates remote process } finally { if (client != null) { - releaseClient(client); + // no longer used + releaseBrokenClient(client); } } @@ -250,13 +271,15 @@ public void updateRemoteAngularObject(String name, String noteId, Object o) { logger.error("Can't update angular object", e); } + boolean broken = false; try { Gson gson = new Gson(); client.angularObjectUpdate(name, noteId, gson.toJson(o)); } catch (TException e) { + broken = true; logger.error("Can't update angular object", e); } finally { - releaseClient(client); + releaseClient(client, broken); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 51dab12cf1d..9be5c22055f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -251,6 +251,7 @@ public synchronized Job.Status getStatus() { return Status.ERROR; } + boolean broken = false; try { String statusStr = client.getStatus(job.getId()); if ("Unknown".equals(statusStr)) { @@ -265,6 +266,7 @@ public synchronized Job.Status getStatus() { listener.afterStatusChange(job, null, status); return status; } catch (TException e) { + broken = true; logger.error("Can't get status information", e); lastStatus = Status.ERROR; return Status.ERROR; @@ -273,7 +275,7 @@ public synchronized Job.Status getStatus() { lastStatus = Status.ERROR; return Status.ERROR; } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } }