From 43c9ea5051745a013fd78d5a9497e960d42c6570 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Fri, 30 Dec 2016 14:29:38 -0800 Subject: [PATCH 01/11] extends z.status on RemoteWorks --- .../zeppelin/spark/ZeppelinContext.java | 37 ++++++++++ .../interpreter/RemoteWorksController.java | 1 + .../interpreter/RemoteZeppelinJobStatus.java | 65 ++++++++++++++++++ .../RemoteZeppelinServerResource.java | 3 +- .../remote/RemoteInterpreterEventClient.java | 15 +++++ .../remote/RemoteInterpreterEventPoller.java | 67 +++++++++++++++++++ .../RemoteInterpreterProcessListener.java | 2 + .../remote/RemoteInterpreterServer.java | 31 +++++++++ .../thrift/InterpreterCompletion.java | 2 +- .../thrift/RemoteApplicationResult.java | 2 +- .../thrift/RemoteInterpreterContext.java | 2 +- .../thrift/RemoteInterpreterEvent.java | 2 +- .../thrift/RemoteInterpreterEventType.java | 5 +- .../thrift/RemoteInterpreterResult.java | 2 +- .../RemoteInterpreterResultMessage.java | 2 +- .../thrift/RemoteInterpreterService.java | 2 +- ...ZeppelinServerResourceParagraphRunner.java | 2 +- .../thrift/RemoteInterpreterService.thrift | 3 +- .../RemoteInterpreterOutputTestStream.java | 5 ++ .../scheduler/RemoteSchedulerTest.java | 5 ++ .../zeppelin/socket/NotebookServer.java | 35 +++++++++- 21 files changed, 278 insertions(+), 12 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index d1234dfd987..c8dff2a9730 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -345,10 +346,21 @@ public void run(String noteId, String paragraphId, InterpreterContext context) { } + /** + * Run Zeppelin Note by note id + * @param noteId + */ + @ZeppelinApi public void runNote(String noteId) { runNote(noteId, interpreterContext); } + /** + * Run Zepppelin Note by note id + * @param noteId + * @param context + */ + @ZeppelinApi public void runNote(String noteId, InterpreterContext context) { String runningNoteId = context.getNoteId(); String runningParagraphId = context.getParagraphId(); @@ -366,6 +378,31 @@ public void runNote(String noteId, InterpreterContext context) { } } + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getZepplinJobStatus(String noteId, String paragraphId) { + return getZepplinJobStatus(noteId, paragraphId, interpreterContext); + } + + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @param context + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getZepplinJobStatus( + String noteId, String paragraphId, InterpreterContext context) { + RemoteWorksController remoteWorksController = context.getRemoteWorksController(); + return remoteWorksController.getRemoteJobStatus(noteId, paragraphId); + } + /** * get Zeppelin Paragraph Runner from zeppelin server diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java index e1410d61abb..a1fc899801f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java @@ -26,4 +26,5 @@ public interface RemoteWorksController { List getRemoteContextRunner(String noteId); List getRemoteContextRunner(String noteId, String paragraphId); + RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java new file mode 100644 index 00000000000..da9b70141cd --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.scheduler.Job.Status; + +import java.util.Date; + +/** + * Remote Zeppelin Server job status + */ +public class RemoteZeppelinJobStatus { + + private String noteId; + private Status jobStatus; + private String paragraphId; + private Date lastRunningTime; + + public String getNoteId() { + return noteId; + } + + public void setNoteId(String noteId) { + this.noteId = noteId; + } + + public Status getJobStatus() { + return jobStatus; + } + + public void setJobStatus(Status jobStatus) { + this.jobStatus = jobStatus; + } + + public String getParagraphId() { + return paragraphId; + } + + public void setParagraphId(String paragraphId) { + this.paragraphId = paragraphId; + } + + public Date getLastRunningTime() { + return lastRunningTime; + } + + public void setLastRunningTime(Date lastRunningTime) { + this.lastRunningTime = lastRunningTime; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java index b2a87aa8bd5..34bae1a4713 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java @@ -25,7 +25,8 @@ public class RemoteZeppelinServerResource { * Resource Type for Zeppelin Server */ public enum Type{ - PARAGRAPH_RUNNERS + PARAGRAPH_RUNNERS, + JOB_STATUS } private String ownerKey; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 606d35f60ee..881643a8d0d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -21,6 +21,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -69,6 +70,20 @@ public void getZeppelinServerNoteRunner( gson.toJson(eventBody))); } + public void getZeppelinServerJobStatus(String eventOwnerKey, String noteId, String paragraphId) { + RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource(); + eventBody.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + eventBody.setOwnerKey(eventOwnerKey); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + eventBody.setData(jobStatus); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS, + gson.toJson(eventBody))); + } + /** * Run paragraph * @param runner diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index e794140e8bd..c739d546325 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -239,6 +240,12 @@ public void run() { progressRemoteZeppelinControlEvent( reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS) { + RemoteZeppelinServerResource reqResourceBody = gson.fromJson( + event.getData(), RemoteZeppelinServerResource.class); + progressRemoteJobStatusControlEvent( + reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { Map metaInfos = gson.fromJson(event.getData(), new TypeToken>() { @@ -334,6 +341,66 @@ public void onError() { } } + private void progressRemoteJobStatusControlEvent( + RemoteZeppelinServerResource.Type resourceType, + RemoteInterpreterProcessListener remoteWorksEventListener, + RemoteZeppelinServerResource reqResourceBody) throws Exception { + boolean broken = false; + final Gson gson = new Gson(); + final String eventOwnerKey = reqResourceBody.getOwnerKey(); + Client interpreterServerMain = null; + try { + interpreterServerMain = interpreterProcess.getClient(); + final Client eventClient = interpreterServerMain; + if (resourceType == RemoteZeppelinServerResource.Type.JOB_STATUS) { + Map jobStatus = (Map) reqResourceBody.getData(); + + String noteId = (String) jobStatus.get("noteId"); + String paragraphId = (String) jobStatus.get("paragraphId"); + + RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent = + new RemoteInterpreterProcessListener.RemoteWorksEventListener() { + + @Override + public void onFinished(Object resultObject) { + boolean clientBroken = false; + if (resultObject != null && resultObject instanceof RemoteZeppelinJobStatus) { + + RemoteZeppelinServerResource resResource = new RemoteZeppelinServerResource(); + resResource.setOwnerKey(eventOwnerKey); + resResource.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + resResource.setData(resultObject); + + try { + eventClient.onReceivedZeppelinResource(gson.toJson(resResource)); + } catch (Exception e) { + clientBroken = true; + logger.error("Can't get Remote Job Status Event", e); + waitQuietly(); + } finally { + interpreterProcess.releaseClient(eventClient, clientBroken); + } + } + } + + @Override + public void onError() { + logger.info("onGetParagraphRunners onError"); + } + }; + + remoteWorksEventListener.onGetParagraphJobStatus(noteId, paragraphId, callBackEvent); + } + } catch (Exception e) { + broken = true; + logger.error("Can't get RemoteInterpreter Job Status Event", e); + waitQuietly(); + + } finally { + interpreterProcess.releaseClient(interpreterServerMain, broken); + } + } + private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { Client client = null; boolean broken = false; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 66b08c95a1d..4ac60ffcf3e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -32,6 +32,8 @@ public void onOutputUpdated( public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception; public void onGetParagraphRunners( String noteId, String paragraphId, RemoteWorksEventListener callback); + public void onGetParagraphJobStatus( + String noteId, String paragraphId, RemoteWorksEventListener callback); /** * Remote works for Interpreter callback listener diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 8624b57e744..ca2422d02d1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.*; +import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; @@ -373,6 +374,17 @@ public void onReceivedZeppelinResource(String responseJson) throws TException { response.getOwnerKey(), intpContextRunners); } + } else if (response.getResourceType() == RemoteZeppelinServerResource.Type.JOB_STATUS) { + + String jobStatusJsonString = response.getData().toString(); + RemoteZeppelinJobStatus jobStatus = gson.fromJson( + jobStatusJsonString, RemoteZeppelinJobStatus.class); + + synchronized (this.remoteWorksResponsePool) { + this.remoteWorksResponsePool.put( + response.getOwnerKey(), + jobStatus); + } } } catch (Exception e) { throw e; @@ -719,7 +731,26 @@ public List getRemoteContextRunner( return runners; } + @Override + public RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId) { + RemoteZeppelinJobStatus jobStatus = null; + String ownerKey = generateOwnerKey(); + if (StringUtils.isBlank(noteId) || StringUtils.isBlank(paragraphId)) { + return null; + } + server.eventClient.getZeppelinServerJobStatus(ownerKey, noteId, paragraphId); + try { + this.waitForEvent(ownerKey); + } catch (Exception e) { + return null; + } + synchronized (this.remoteWorksResponsePool) { + jobStatus = (RemoteZeppelinJobStatus) this.remoteWorksResponsePool.get(ownerKey); + this.remoteWorksResponsePool.remove(ownerKey); + } + return jobStatus; + } } private RemoteInterpreterResult convert(InterpreterResult result, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 8a1bc7d56da..3c6619c5322 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index ebb85790a4e..192694a4c70 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index 6a24e5663f1..2212fe6a579 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 39c4f812a5d..6bf3d3b86d0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 7ca406c6709..c338fb21403 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -43,7 +43,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { APP_STATUS_UPDATE(12), META_INFOS(13), REMOTE_ZEPPELIN_SERVER_RESOURCE(14), - RESOURCE_INVOKE_METHOD(15); + RESOURCE_INVOKE_METHOD(15), + REMOTE_ZEPPELIN_JOB_STATUS(16); private final int value; @@ -94,6 +95,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return REMOTE_ZEPPELIN_SERVER_RESOURCE; case 15: return RESOURCE_INVOKE_METHOD; + case 16: + return REMOTE_ZEPPELIN_JOB_STATUS; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 4929efab7bb..dec4c1d5e33 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java index eb1261e9742..eef52f547e6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index 7b2a76e6128..0ab9d018d15 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java index 74cb25d7d96..a8ee58b64a9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-2-2") public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner"); diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 08a15ad9702..980b3048224 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -56,7 +56,8 @@ enum RemoteInterpreterEventType { APP_STATUS_UPDATE = 12, META_INFOS = 13, REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, - RESOURCE_INVOKE_METHOD = 15 + RESOURCE_INVOKE_METHOD = 15, + REMOTE_ZEPPELIN_JOB_STATUS = 16 } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index e3dc6b4c1b3..6a1eb36953f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -182,4 +182,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d7b2007e73c..cc022741ea7 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -357,4 +357,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 68b015d0bac..38e6e5ccec2 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -51,6 +52,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -1919,7 +1921,7 @@ public void onGetParagraphRunners(String noteId, String paragraphId, if (notebookIns == null) { LOG.info("intepreter request notebook instance is null"); - callback.onFinished(notebookIns); + callback.onFinished(runner); } try { @@ -1943,6 +1945,37 @@ public void onGetParagraphRunners(String noteId, String paragraphId, } } + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, + RemoteWorksEventListener callback) { + Notebook notebookIns = notebook(); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + jobStatus.setJobStatus(Status.ERROR); + jobStatus.setLastRunningTime(new Date()); + + if (notebookIns == null) { + LOG.info("intepreter request notebook instance is null"); + callback.onFinished(jobStatus); + } + + try { + Note note = notebookIns.getNote(noteId); + if (note != null) { + if (paragraphId != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + jobStatus.setJobStatus(paragraph.getStatus()); + jobStatus.setLastRunningTime(paragraph.getDateStarted()); + } + } + callback.onFinished(jobStatus); + } catch (NullPointerException e) { + LOG.warn(e.getMessage()); + callback.onError(); + } + } + @Override public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception { Notebook notebookIns = notebook(); From f0e9825e0e5867b750a8b386c0b82233b3915302 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Sat, 31 Dec 2016 21:53:09 -0800 Subject: [PATCH 02/11] implement check status logic in workflow --- .../zeppelin/spark/ZeppelinContext.java | 6 ++--- .../interpreter/RemoteZeppelinJobStatus.java | 22 ++++++++++++++++++- .../remote/RemoteInterpreterServer.java | 10 ++++++--- .../org/apache/zeppelin/scheduler/Job.java | 12 ++++++++++ 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index c8dff2a9730..aa39e0a8dc3 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -385,8 +385,8 @@ public void runNote(String noteId, InterpreterContext context) { * @return */ @ZeppelinApi - public RemoteZeppelinJobStatus getZepplinJobStatus(String noteId, String paragraphId) { - return getZepplinJobStatus(noteId, paragraphId, interpreterContext); + public RemoteZeppelinJobStatus getZeppelinJobStatus(String noteId, String paragraphId) { + return getZeppelinJobStatus(noteId, paragraphId, interpreterContext); } /** @@ -397,7 +397,7 @@ public RemoteZeppelinJobStatus getZepplinJobStatus(String noteId, String paragra * @return */ @ZeppelinApi - public RemoteZeppelinJobStatus getZepplinJobStatus( + public RemoteZeppelinJobStatus getZeppelinJobStatus( String noteId, String paragraphId, InterpreterContext context) { RemoteWorksController remoteWorksController = context.getRemoteWorksController(); return remoteWorksController.getRemoteJobStatus(noteId, paragraphId); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java index da9b70141cd..93f2902c60b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -19,13 +19,16 @@ import org.apache.zeppelin.scheduler.Job.Status; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Locale; /** * Remote Zeppelin Server job status */ public class RemoteZeppelinJobStatus { - private String noteId; private Status jobStatus; private String paragraphId; @@ -47,6 +50,10 @@ public void setJobStatus(Status jobStatus) { this.jobStatus = jobStatus; } + public void setJobStatus(String jobStatusString) { + this.jobStatus = Status.valueOf(jobStatusString); + } + public String getParagraphId() { return paragraphId; } @@ -62,4 +69,17 @@ public Date getLastRunningTime() { public void setLastRunningTime(Date lastRunningTime) { this.lastRunningTime = lastRunningTime; } + + public void setLastRunningTime(String lastRunningTimeString) { + DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH); + Date date = new Date(); + try { + date = format.parse(lastRunningTimeString); + } catch (ParseException e) { + + } finally { + this.lastRunningTime = date; + } + } + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index ca2422d02d1..37ba0b8675e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -376,9 +376,13 @@ public void onReceivedZeppelinResource(String responseJson) throws TException { } } else if (response.getResourceType() == RemoteZeppelinServerResource.Type.JOB_STATUS) { - String jobStatusJsonString = response.getData().toString(); - RemoteZeppelinJobStatus jobStatus = gson.fromJson( - jobStatusJsonString, RemoteZeppelinJobStatus.class); + Map jobStatusMap = (Map) response.getData(); + + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId((String) jobStatusMap.get("noteId")); + jobStatus.setParagraphId((String) jobStatusMap.get("paragraphId")); + jobStatus.setJobStatus((String) jobStatusMap.get("jobStatus")); + jobStatus.setLastRunningTime((String) jobStatusMap.get("lastRunningTime")); synchronized (this.remoteWorksResponsePool) { this.remoteWorksResponsePool.put( diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 76d90b9877d..52dad626bd3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -62,6 +62,18 @@ public boolean isRunning() { public boolean isPending() { return this == PENDING; } + + public boolean isFinished() { + return this == FINISHED; + } + + public boolean isError() { + return this == ERROR; + } + + public boolean isAbort() { + return this == ABORT; + } } private String jobName; From 44fd762e1e9646deb0fdccfec7e3cabaa2042bfb Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Sat, 21 Jan 2017 07:10:10 -0800 Subject: [PATCH 03/11] add status getter --- .../interpreter/RemoteZeppelinJobStatus.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java index 93f2902c60b..168967366b8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -82,4 +82,34 @@ public void setLastRunningTime(String lastRunningTimeString) { } } + public boolean isFinished() { + return getJobStatus().isFinished(); + } + + public boolean isAbort() { + return getJobStatus().isAbort(); + } + + public boolean isError() { + return getJobStatus().isError(); + } + + public boolean isPending() { + return getJobStatus().isPending(); + } + + public boolean isReady() { + return getJobStatus().isReady(); + } + + public boolean isRunning() { + return getJobStatus().isRunning(); + } + + public String getStatus() { + return getJobStatus().name(); + } + + + } From 7eac7572dcaf81f9b14a255f531fb7df7d465d7e Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Sat, 21 Jan 2017 12:08:42 -0800 Subject: [PATCH 04/11] change method name --- .../org/apache/zeppelin/spark/ZeppelinContext.java | 6 +++--- .../zeppelin/interpreter/RemoteZeppelinJobStatus.java | 11 ++++------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index aa39e0a8dc3..afe6e66821f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -385,8 +385,8 @@ public void runNote(String noteId, InterpreterContext context) { * @return */ @ZeppelinApi - public RemoteZeppelinJobStatus getZeppelinJobStatus(String noteId, String paragraphId) { - return getZeppelinJobStatus(noteId, paragraphId, interpreterContext); + public RemoteZeppelinJobStatus getJobStatus(String noteId, String paragraphId) { + return getJobStatus(noteId, paragraphId, interpreterContext); } /** @@ -397,7 +397,7 @@ public RemoteZeppelinJobStatus getZeppelinJobStatus(String noteId, String paragr * @return */ @ZeppelinApi - public RemoteZeppelinJobStatus getZeppelinJobStatus( + public RemoteZeppelinJobStatus getJobStatus( String noteId, String paragraphId, InterpreterContext context) { RemoteWorksController remoteWorksController = context.getRemoteWorksController(); return remoteWorksController.getRemoteJobStatus(noteId, paragraphId); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java index 168967366b8..d61cff46761 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -82,6 +82,10 @@ public void setLastRunningTime(String lastRunningTimeString) { } } + public String name() { + return getJobStatus().name(); + } + public boolean isFinished() { return getJobStatus().isFinished(); } @@ -105,11 +109,4 @@ public boolean isReady() { public boolean isRunning() { return getJobStatus().isRunning(); } - - public String getStatus() { - return getJobStatus().name(); - } - - - } From 01614e6be21f7d08038db55b38a6da58744df505 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Sat, 21 Jan 2017 12:55:01 -0800 Subject: [PATCH 05/11] added test case --- .../rest/ZeppelinSparkClusterTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 4816e4f16a4..8585c8741dd 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -380,6 +380,37 @@ public void zRunTest() throws IOException { ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } + @Test + public void zGetJobStatusTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p0 = note.addParagraph(AuthenticationInfo.ANONYMOUS); + Map config0 = p0.getConfig(); + config0.put("enabled", true); + p0.setConfig(config0); + p0.setText("%spark println(\"Hello\")"); + p0.setAuthenticationInfo(anonymous); + + Paragraph p1 = note.addParagraph(AuthenticationInfo.ANONYMOUS); + Map config1 = p1.getConfig(); + config1.put("enabled", true); + p1.setConfig(config1); + p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\""); + p1.setAuthenticationInfo(anonymous); + + note.runAll(); + + // z.run is not blocking call. So p1 may not be finished when p0 is done. + waitForFinish(p1); + + assertEquals(Status.FINISHED, p0.getStatus()); + assertEquals(Status.FINISHED, p1.getStatus()); + + assertTrue(p0.getStatus().name().equals(p1.getResult().message().get(0).toString())); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + @Test public void pySparkDepLoaderTest() throws IOException { // create new note From cdbdb4ce51453239323e228847f7b6d87cbc2bb6 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Sat, 21 Jan 2017 13:43:09 -0800 Subject: [PATCH 06/11] =?UTF-8?q?remove=20=E2=80=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 8585c8741dd..9ee22226fdb 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\""); + p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); p1.setAuthenticationInfo(anonymous); note.runAll(); From b9948df748281579c1998434915b0a9e29dd274d Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Mon, 23 Jan 2017 14:03:14 -0800 Subject: [PATCH 07/11] change test case for println --- .../java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 9ee22226fdb..77fced90704 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); + p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\")"); p1.setAuthenticationInfo(anonymous); note.runAll(); From 7ecde8aeca227ff43c7ac2c5bb139dfedbeee4b6 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Mon, 23 Jan 2017 14:57:53 -0800 Subject: [PATCH 08/11] remove println in testcase --- .../java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 77fced90704..36398aea004 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark println(\"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\")"); + p1.setText("%spark \"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); p1.setAuthenticationInfo(anonymous); note.runAll(); From a6b1719594c481c9236eea639b419526ab7918db Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Mon, 23 Jan 2017 18:40:24 -0800 Subject: [PATCH 09/11] remove " --- .../java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 36398aea004..850baee6326 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark \"z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); + p1.setText("%spark z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); p1.setAuthenticationInfo(anonymous); note.runAll(); From 0482333c3394b878dec5cdb9f60322c6d19a2de5 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Mon, 23 Jan 2017 23:13:05 -0800 Subject: [PATCH 10/11] remove system.out --- .../org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 850baee6326..aa2808ae9ad 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()"); + p1.setText("%spark var status = z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\n println(status)"); p1.setAuthenticationInfo(anonymous); note.runAll(); @@ -406,7 +406,7 @@ public void zGetJobStatusTest() throws IOException { assertEquals(Status.FINISHED, p0.getStatus()); assertEquals(Status.FINISHED, p1.getStatus()); - assertTrue(p0.getStatus().name().equals(p1.getResult().message().get(0).toString())); + assertTrue(p0.getStatus().name().equals(p1.getResult().message())); ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } From bc127ed246ff9a4576e77005359eded445586508 Mon Sep 17 00:00:00 2001 From: cloverhearts Date: Mon, 23 Jan 2017 23:13:05 -0800 Subject: [PATCH 11/11] missing closed tag in test case --- .../java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index aa2808ae9ad..3e527376e8a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -395,7 +395,7 @@ public void zGetJobStatusTest() throws IOException { Map config1 = p1.getConfig(); config1.put("enabled", true); p1.setConfig(config1); - p1.setText("%spark var status = z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name()\n println(status)"); + p1.setText("%spark var status = z.getJobStatus(\"" + note.getId() + "\",\"" + p0.getId() + "\").name())\n println(status)"); p1.setAuthenticationInfo(anonymous); note.runAll();