Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;

import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
Expand Down Expand Up @@ -77,15 +78,19 @@ public AngularObject addAndNotifyRemoteProcess(String name, Object o, String not
}

Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectAdd(name, noteId, gson.toJson(o));
return super.add(name, o, noteId, true);
} catch (TException e) {
broken = true;
logger.error("Error", e);
} catch (Exception e) {
logger.error("Error", e);
} finally {
if (client != null) {
remoteInterpreterProcess.releaseClient(client);
remoteInterpreterProcess.releaseClient(client, broken);
}
}
return null;
Expand All @@ -106,15 +111,19 @@ public AngularObject removeAndNotifyRemoteProcess(String name, String noteId) {
}

Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
client.angularObjectRemove(name, noteId);
return super.remove(name, noteId);
} catch (TException e) {
broken = true;
logger.error("Error", e);
} catch (Exception e) {
logger.error("Error", e);
} finally {
if (client != null) {
remoteInterpreterProcess.releaseClient(client);
remoteInterpreterProcess.releaseClient(client, broken);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,18 @@ private synchronized void init() {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
client.createInterpreter(intp.getClassName(), (Map) property);

}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
}
Expand All @@ -158,14 +160,19 @@ public void open() {
public void close() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;

boolean broken = false;
try {
client = interpreterProcess.getClient();
client.close(className);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} catch (Exception e1) {
throw new InterpreterException(e1);
} finally {
if (client != null) {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
getInterpreterProcess().dereference();
}
Expand Down Expand Up @@ -195,6 +202,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
interpreterContextRunnerPool.addAll(noteId, runners);
}

boolean broken = false;
try {
GUI settings = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
Expand All @@ -215,9 +223,10 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterResult result = convert(remoteResult);
return result;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

Expand All @@ -231,12 +240,14 @@ public void cancel(InterpreterContext context) {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
client.cancel(className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

Expand All @@ -257,13 +268,15 @@ public FormType getFormType() {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(className));
return formType;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

Expand All @@ -277,12 +290,14 @@ public int getProgress(InterpreterContext context) {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
return client.getProgress(className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

Expand All @@ -297,12 +312,14 @@ public List<String> completion(String buf, int cursor) {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
return client.completion(className, buf, cursor);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ public void run() {
}

RemoteInterpreterEvent event = null;
boolean broken = false;
try {
event = client.getEvent();
} catch (TException e) {
broken = true;
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
continue;
} finally {
interpreterProcess.releaseClient(client, broken);
}

interpreterProcess.releaseClient(client);

Gson gson = new Gson();

AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,27 @@ public Client getClient() throws Exception {
}

public void releaseClient(Client client) {
clientPool.returnObject(client);
releaseClient(client, false);
}

public void releaseClient(Client client, boolean broken) {
if (broken) {
releaseBrokenClient(client);
} else {
try {
clientPool.returnObject(client);
} catch (Exception e) {
logger.warn("exception occurred during releasing thrift client", e);
}
}
}

public void releaseBrokenClient(Client client) {
try {
clientPool.invalidateObject(client);
} catch (Exception e) {
logger.warn("exception occurred during releasing thrift client", e);
}
}

public int dereference() {
Expand All @@ -159,7 +179,8 @@ public int dereference() {
// safely ignore exception while client.shutdown() may terminates remote process
} finally {
if (client != null) {
releaseClient(client);
// no longer used
releaseBrokenClient(client);
}
}

Expand Down Expand Up @@ -250,13 +271,15 @@ public void updateRemoteAngularObject(String name, String noteId, Object o) {
logger.error("Can't update angular object", e);
}

boolean broken = false;
try {
Gson gson = new Gson();
client.angularObjectUpdate(name, noteId, gson.toJson(o));
} catch (TException e) {
broken = true;
logger.error("Can't update angular object", e);
} finally {
releaseClient(client);
releaseClient(client, broken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ public synchronized Job.Status getStatus() {
return Status.ERROR;
}

boolean broken = false;
try {
String statusStr = client.getStatus(job.getId());
if ("Unknown".equals(statusStr)) {
Expand All @@ -265,6 +266,7 @@ public synchronized Job.Status getStatus() {
listener.afterStatusChange(job, null, status);
return status;
} catch (TException e) {
broken = true;
logger.error("Can't get status information", e);
lastStatus = Status.ERROR;
return Status.ERROR;
Expand All @@ -273,7 +275,7 @@ public synchronized Job.Status getStatus() {
lastStatus = Status.ERROR;
return Status.ERROR;
} finally {
interpreterProcess.releaseClient(client);
interpreterProcess.releaseClient(client, broken);
}
}
}
Expand Down