diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index d1234dfd987..afe6e66821f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -345,10 +346,21 @@ public void run(String noteId, String paragraphId, InterpreterContext context) { } + /** + * Run Zeppelin Note by note id + * @param noteId + */ + @ZeppelinApi public void runNote(String noteId) { runNote(noteId, interpreterContext); } + /** + * Run Zepppelin Note by note id + * @param noteId + * @param context + */ + @ZeppelinApi public void runNote(String noteId, InterpreterContext context) { String runningNoteId = context.getNoteId(); String runningParagraphId = context.getParagraphId(); @@ -366,6 +378,31 @@ public void runNote(String noteId, InterpreterContext context) { } } + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getJobStatus(String noteId, String paragraphId) { + return getJobStatus(noteId, paragraphId, interpreterContext); + } + + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @param context + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getJobStatus( + String noteId, String paragraphId, InterpreterContext context) { + RemoteWorksController remoteWorksController = context.getRemoteWorksController(); + return remoteWorksController.getRemoteJobStatus(noteId, paragraphId); + } + /** * get Zeppelin Paragraph Runner from zeppelin server diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java index e1410d61abb..a1fc899801f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java @@ -26,4 +26,5 @@ public interface RemoteWorksController { List getRemoteContextRunner(String noteId); List getRemoteContextRunner(String noteId, String paragraphId); + RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java new file mode 100644 index 00000000000..d61cff46761 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.scheduler.Job.Status; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; + +/** + * Remote Zeppelin Server job status + */ +public class RemoteZeppelinJobStatus { + private String noteId; + private Status jobStatus; + private String paragraphId; + private Date lastRunningTime; + + public String getNoteId() { + return noteId; + } + + public void setNoteId(String noteId) { + this.noteId = noteId; + } + + public Status getJobStatus() { + return jobStatus; + } + + public void setJobStatus(Status jobStatus) { + this.jobStatus = jobStatus; + } + + public void setJobStatus(String jobStatusString) { + this.jobStatus = Status.valueOf(jobStatusString); + } + + public String getParagraphId() { + return paragraphId; + } + + public void setParagraphId(String paragraphId) { + this.paragraphId = paragraphId; + } + + public Date getLastRunningTime() { + return lastRunningTime; + } + + public void setLastRunningTime(Date lastRunningTime) { + this.lastRunningTime = lastRunningTime; + } + + public void setLastRunningTime(String lastRunningTimeString) { + DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH); + Date date = new Date(); + try { + date = format.parse(lastRunningTimeString); + } catch (ParseException e) { + + } finally { + this.lastRunningTime = date; + } + } + + public String name() { + return getJobStatus().name(); + } + + public boolean isFinished() { + return getJobStatus().isFinished(); + } + + public boolean isAbort() { + return getJobStatus().isAbort(); + } + + public boolean isError() { + return getJobStatus().isError(); + } + + public boolean isPending() { + return getJobStatus().isPending(); + } + + public boolean isReady() { + return getJobStatus().isReady(); + } + + public boolean isRunning() { + return getJobStatus().isRunning(); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java index b2a87aa8bd5..34bae1a4713 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java @@ -25,7 +25,8 @@ public class RemoteZeppelinServerResource { * Resource Type for Zeppelin Server */ public enum Type{ - PARAGRAPH_RUNNERS + PARAGRAPH_RUNNERS, + JOB_STATUS } private String ownerKey; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 606d35f60ee..881643a8d0d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -21,6 +21,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -69,6 +70,20 @@ public void getZeppelinServerNoteRunner( gson.toJson(eventBody))); } + public void getZeppelinServerJobStatus(String eventOwnerKey, String noteId, String paragraphId) { + RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource(); + eventBody.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + eventBody.setOwnerKey(eventOwnerKey); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + eventBody.setData(jobStatus); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS, + gson.toJson(eventBody))); + } + /** * Run paragraph * @param runner diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index e794140e8bd..c739d546325 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -239,6 +240,12 @@ public void run() { progressRemoteZeppelinControlEvent( reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS) { + RemoteZeppelinServerResource reqResourceBody = gson.fromJson( + event.getData(), RemoteZeppelinServerResource.class); + progressRemoteJobStatusControlEvent( + reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { Map metaInfos = gson.fromJson(event.getData(), new TypeToken>() { @@ -334,6 +341,66 @@ public void onError() { } } + private void progressRemoteJobStatusControlEvent( + RemoteZeppelinServerResource.Type resourceType, + RemoteInterpreterProcessListener remoteWorksEventListener, + RemoteZeppelinServerResource reqResourceBody) throws Exception { + boolean broken = false; + final Gson gson = new Gson(); + final String eventOwnerKey = reqResourceBody.getOwnerKey(); + Client interpreterServerMain = null; + try { + interpreterServerMain = interpreterProcess.getClient(); + final Client eventClient = interpreterServerMain; + if (resourceType == RemoteZeppelinServerResource.Type.JOB_STATUS) { + Map jobStatus = (Map) reqResourceBody.getData(); + + String noteId = (String) jobStatus.get("noteId"); + String paragraphId = (String) jobStatus.get("paragraphId"); + + RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent = + new RemoteInterpreterProcessListener.RemoteWorksEventListener() { + + @Override + public void onFinished(Object resultObject) { + boolean clientBroken = false; + if (resultObject != null && resultObject instanceof RemoteZeppelinJobStatus) { + + RemoteZeppelinServerResource resResource = new RemoteZeppelinServerResource(); + resResource.setOwnerKey(eventOwnerKey); + resResource.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + resResource.setData(resultObject); + + try { + eventClient.onReceivedZeppelinResource(gson.toJson(resResource)); + } catch (Exception e) { + clientBroken = true; + logger.error("Can't get Remote Job Status Event", e); + waitQuietly(); + } finally { + interpreterProcess.releaseClient(eventClient, clientBroken); + } + } + } + + @Override + public void onError() { + logger.info("onGetParagraphRunners onError"); + } + }; + + remoteWorksEventListener.onGetParagraphJobStatus(noteId, paragraphId, callBackEvent); + } + } catch (Exception e) { + broken = true; + logger.error("Can't get RemoteInterpreter Job Status Event", e); + waitQuietly(); + + } finally { + interpreterProcess.releaseClient(interpreterServerMain, broken); + } + } + private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { Client client = null; boolean broken = false; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 66b08c95a1d..4ac60ffcf3e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -32,6 +32,8 @@ public void onOutputUpdated( public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception; public void onGetParagraphRunners( String noteId, String paragraphId, RemoteWorksEventListener callback); + public void onGetParagraphJobStatus( + String noteId, String paragraphId, RemoteWorksEventListener callback); /** * Remote works for Interpreter callback listener diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 8624b57e744..37ba0b8675e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.*; +import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; @@ -373,6 +374,21 @@ public void onReceivedZeppelinResource(String responseJson) throws TException { response.getOwnerKey(), intpContextRunners); } + } else if (response.getResourceType() == RemoteZeppelinServerResource.Type.JOB_STATUS) { + + Map jobStatusMap = (Map) response.getData(); + + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId((String) jobStatusMap.get("noteId")); + jobStatus.setParagraphId((String) jobStatusMap.get("paragraphId")); + jobStatus.setJobStatus((String) jobStatusMap.get("jobStatus")); + jobStatus.setLastRunningTime((String) jobStatusMap.get("lastRunningTime")); + + synchronized (this.remoteWorksResponsePool) { + this.remoteWorksResponsePool.put( + response.getOwnerKey(), + jobStatus); + } } } catch (Exception e) { throw e; @@ -719,7 +735,26 @@ public List getRemoteContextRunner( return runners; } + @Override + public RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId) { + RemoteZeppelinJobStatus jobStatus = null; + String ownerKey = generateOwnerKey(); + if (StringUtils.isBlank(noteId) || StringUtils.isBlank(paragraphId)) { + return null; + } + server.eventClient.getZeppelinServerJobStatus(ownerKey, noteId, paragraphId); + try { + this.waitForEvent(ownerKey); + } catch (Exception e) { + return null; + } + synchronized (this.remoteWorksResponsePool) { + jobStatus = (RemoteZeppelinJobStatus) this.remoteWorksResponsePool.get(ownerKey); + this.remoteWorksResponsePool.remove(ownerKey); + } + return jobStatus; + } } private RemoteInterpreterResult convert(InterpreterResult result, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 8a1bc7d56da..3c6619c5322 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index ebb85790a4e..192694a4c70 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index 6a24e5663f1..2212fe6a579 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 39c4f812a5d..6bf3d3b86d0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 7ca406c6709..c338fb21403 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -43,7 +43,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { APP_STATUS_UPDATE(12), META_INFOS(13), REMOTE_ZEPPELIN_SERVER_RESOURCE(14), - RESOURCE_INVOKE_METHOD(15); + RESOURCE_INVOKE_METHOD(15), + REMOTE_ZEPPELIN_JOB_STATUS(16); private final int value; @@ -94,6 +95,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return REMOTE_ZEPPELIN_SERVER_RESOURCE; case 15: return RESOURCE_INVOKE_METHOD; + case 16: + return REMOTE_ZEPPELIN_JOB_STATUS; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 4929efab7bb..dec4c1d5e33 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java index eb1261e9742..eef52f547e6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index 7b2a76e6128..0ab9d018d15 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java index 74cb25d7d96..a8ee58b64a9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 76d90b9877d..52dad626bd3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -62,6 +62,18 @@ public boolean isRunning() { public boolean isPending() { return this == PENDING; } + + public boolean isFinished() { + return this == FINISHED; + } + + public boolean isError() { + return this == ERROR; + } + + public boolean isAbort() { + return this == ABORT; + } } private String jobName; diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 08a15ad9702..980b3048224 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -56,7 +56,8 @@ enum RemoteInterpreterEventType { APP_STATUS_UPDATE = 12, META_INFOS = 13, REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, - RESOURCE_INVOKE_METHOD = 15 + RESOURCE_INVOKE_METHOD = 15, + REMOTE_ZEPPELIN_JOB_STATUS = 16 } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index e3dc6b4c1b3..6a1eb36953f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -182,4 +182,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d7b2007e73c..cc022741ea7 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -357,4 +357,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 68b015d0bac..38e6e5ccec2 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -51,6 +52,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -1919,7 +1921,7 @@ public void onGetParagraphRunners(String noteId, String paragraphId, if (notebookIns == null) { LOG.info("intepreter request notebook instance is null"); - callback.onFinished(notebookIns); + callback.onFinished(runner); } try { @@ -1943,6 +1945,37 @@ public void onGetParagraphRunners(String noteId, String paragraphId, } } + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, + RemoteWorksEventListener callback) { + Notebook notebookIns = notebook(); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + jobStatus.setJobStatus(Status.ERROR); + jobStatus.setLastRunningTime(new Date()); + + if (notebookIns == null) { + LOG.info("intepreter request notebook instance is null"); + callback.onFinished(jobStatus); + } + + try { + Note note = notebookIns.getNote(noteId); + if (note != null) { + if (paragraphId != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + jobStatus.setJobStatus(paragraph.getStatus()); + jobStatus.setLastRunningTime(paragraph.getDateStarted()); + } + } + callback.onFinished(jobStatus); + } catch (NullPointerException e) { + LOG.warn(e.getMessage()); + callback.onError(); + } + } + @Override public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception { Notebook notebookIns = notebook(); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 4816e4f16a4..3e527376e8a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -380,6 +380,37 @@ public void zRunTest() throws IOException { ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } + @Test + public void zGetJobStatusTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p0 = note.addParagraph(AuthenticationInfo.ANONYMOUS); + Map config0 = p0.getConfig(); + config0.put("enabled", true); + p0.setConfig(config0); + p0.setText("%spark println(\"Hello\")"); + p0.setAuthenticationInfo(anonymous); + + Paragraph p1 = note.addParagraph(AuthenticationInfo.ANONYMOUS); + Map config1 = p1.getConfig(); + config1.put("enabled", true); + p1.setConfig(config1); + p1.setText("%spark var status = z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name())\n println(status)"); + p1.setAuthenticationInfo(anonymous); + + note.runAll(); + + // z.run is not blocking call. So p1 may not be finished when p0 is done. + waitForFinish(p1); + + assertEquals(Status.FINISHED, p0.getStatus()); + assertEquals(Status.FINISHED, p1.getStatus()); + + assertTrue(p0.getStatus().name().equals(p1.getResult().message())); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + @Test public void pySparkDepLoaderTest() throws IOException { // create new note