diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 6c50ee4b66a..2bbe7960c82 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -686,7 +686,52 @@ public Response getCronJob(@PathParam("notebookId") String notebookId) throws } return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build(); - } + } + + /** + * Get notebook jobs for job manager + * @param + * @return JSON with status.OK + * @throws IOException, IllegalArgumentException + */ + @GET + @Path("jobmanager/") + @ZeppelinApi + public Response getJobListforNotebook() throws IOException, IllegalArgumentException { + LOG.info("Get notebook jobs for job manager"); + + List> notebookJobs = notebook.getJobListforNotebook(false, 0); + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + return new JsonResponse<>(Status.OK, response).build(); + } + + /** + * Get updated notebook jobs for job manager + * @param + * @return JSON with status.OK + * @throws IOException, IllegalArgumentException + */ + @GET + @Path("jobmanager/{lastUpdateUnixtime}/") + @ZeppelinApi + public Response getUpdatedJobListforNotebook( + @PathParam("lastUpdateUnixtime") long lastUpdateUnixTime) throws + IOException, IllegalArgumentException { + LOG.info("Get updated notebook jobs lastUpdateTime {}", lastUpdateUnixTime); + + List> notebookJobs; + notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime); + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + return new JsonResponse<>(Status.OK, response).build(); + } /** * Search for a Notes with permissions diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 7d50809d5ac..a94fefac892 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -61,6 +61,20 @@ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, RemoteInterpreterProcessListener { + /** + * Job manager service type + */ + protected enum JOB_MANAGER_SERVICE { + JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); + private String serviceTypeKey; + JOB_MANAGER_SERVICE(String serviceType) { + this.serviceTypeKey = serviceType; + } + String getKey() { + return this.serviceTypeKey; + } + } + private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); Gson gson = new GsonBuilder() .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); @@ -203,6 +217,12 @@ public void onMessage(NotebookSocket conn, String msg) { case CHECKPOINT_NOTEBOOK: checkpointNotebook(conn, notebook, messagereceived); break; + case LIST_NOTEBOOK_JOBS: + unicastNotebookJobInfo(conn); + break; + case LIST_UPDATE_NOTEBOOK_JOBS: + unicastUpdateNotebookJobInfo(conn, messagereceived); + break; default: break; } @@ -350,6 +370,34 @@ private void unicast(Message m, NotebookSocket conn) { } } + public void unicastNotebookJobInfo(NotebookSocket conn) throws IOException { + + List> notebookJobs = notebook().getJobListforNotebook(false, 0); + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + conn.send(serializeMessage(new Message(OP.LIST_NOTEBOOK_JOBS) + .put("notebookJobs", response))); + } + + public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage) + throws IOException { + double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime"); + long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue(); + + List> notebookJobs; + notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime); + + Map response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS) + .put("notebookRunningJobs", response))); + } + public List> generateNotebooksInfo(boolean needsReload) { Notebook notebook = notebook(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 4e611119bb8..58a552d9ae0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -489,6 +489,138 @@ public void setJobListenerFactory(JobListenerFactory jobListenerFactory) { this.jobListenerFactory = jobListenerFactory; } + private Map getParagraphForJobManagerItem(Paragraph paragraph) { + Map paragraphItem = new HashMap<>(); + + // set paragraph id + paragraphItem.put("id", paragraph.getId()); + + // set paragraph name + String paragraphName = paragraph.getTitle(); + if (paragraphName != null) { + paragraphItem.put("name", paragraphName); + } else { + paragraphItem.put("name", paragraph.getId()); + } + + // set status for paragraph. + paragraphItem.put("status", paragraph.getStatus().toString()); + + return paragraphItem; + } + + private long getUnixTimeLastRunParagraph(Paragraph paragraph) { + + Date lastRunningDate = null; + long lastRunningUnixTime = 0; + + Date paragaraphDate = paragraph.getDateStarted(); + // diff started time <-> finishied time + if (paragaraphDate == null) { + paragaraphDate = paragraph.getDateFinished(); + } else { + if (paragraph.getDateFinished() != null && + paragraph.getDateFinished().after(paragaraphDate)) { + paragaraphDate = paragraph.getDateFinished(); + } + } + + // finished time and started time is not exists. + if (paragaraphDate == null) { + paragaraphDate = paragraph.getDateCreated(); + } + + // set last update unixtime(ms). + lastRunningDate = paragaraphDate; + + lastRunningUnixTime = lastRunningDate.getTime(); + + return lastRunningUnixTime; + } + + public List> getJobListforNotebook(boolean needsReload, + long lastUpdateServerUnixTime) { + final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; + + if (needsReload) { + try { + reloadAllNotes(); + } catch (IOException e) { + logger.error("Fail to reload notes from repository"); + } + } + + List notes = getAllNotes(); + List> notesInfo = new LinkedList<>(); + for (Note note : notes) { + boolean isNotebookRunning = false; + boolean isUpdateNotebook = false; + long lastRunningUnixTime = 0; + Map info = new HashMap<>(); + + // set notebook ID + info.put("notebookId", note.id()); + + // set notebook Name + String notebookName = note.getName(); + if (notebookName != null) { + info.put("notebookName", note.getName()); + } else { + info.put("notebookName", "Note " + note.id()); + } + + // set notebook type ( cron or normal ) + if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true && + !note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) { + info.put("notebookType", "cron"); + } + else { + info.put("notebookType", "normal"); + } + + // set paragraphs + List> paragraphsInfo = new LinkedList<>(); + for (Paragraph paragraph : note.getParagraphs()) { + // check paragraph's status. + if (paragraph.getStatus().isRunning() == true) { + isNotebookRunning = true; + isUpdateNotebook = true; + } + + // get data for the job manager. + Map paragraphItem = getParagraphForJobManagerItem(paragraph); + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + + // is update notebook for last server update time. + if (lastRunningUnixTime > lastUpdateServerUnixTime) { + paragraphsInfo.add(paragraphItem); + isUpdateNotebook = true; + } + } + + // set interpreter bind type + String interpreterGroupName = null; + if (note.getNoteReplLoader().getInterpreterSettings() != null && + note.getNoteReplLoader().getInterpreterSettings().size() >= 1) { + interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup(); + } + + // not update and not running -> pass + if (isUpdateNotebook == false && isNotebookRunning == false) { + continue; + } + + // notebook json object root information. + info.put("interpreter", interpreterGroupName); + info.put("isRunningJob", isNotebookRunning); + info.put("unixTimeLastRun", lastRunningUnixTime); + info.put("paragraphs", paragraphsInfo); + notesInfo.add(info); + } + + return notesInfo; + } + /** * Cron task for the note. */ diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index a3fc048d61e..320709e4ff0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -111,10 +111,12 @@ public static enum OP { CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations // @param settings serialized Map object - CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository - // @param noteId - // @param checkpointName - + CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository + // @param noteId + // @param checkpointName + LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations + LIST_UPDATE_NOTEBOOK_JOBS // [c-s] get job management informations for until unixtime + // @param unixTime } public OP op;