diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 6c50ee4b66a..48cc575be38 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -32,6 +32,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; @@ -54,9 +55,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import com.google.gson.GsonBuilder; -import com.google.gson.stream.JsonReader; -import java.io.StringReader; + /** * Rest api endpoint for the noteBook. */ @@ -489,8 +488,13 @@ public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws if (note == null) { return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - - note.runAll(); + try { + note.runAll(); + } catch (InterpreterException intpException) { + return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, intpException.getMessage()).build(); + } catch (Exception e) { + return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage()).build(); + } return new JsonResponse<>(Status.OK).build(); } @@ -511,10 +515,14 @@ public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - for (Paragraph p : note.getParagraphs()) { - if (!p.isTerminated()) { - p.abort(); + try { + for (Paragraph p : note.getParagraphs()) { + if (!p.isTerminated()) { + p.abort(); + } } + } catch (Exception e) { + return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage()).build(); } return new JsonResponse<>(Status.OK).build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 7412611532b..c097fef82af 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.SearchService; +import org.apache.zeppelin.socket.AppMainServer; import org.apache.zeppelin.socket.NotebookServer; import org.apache.zeppelin.user.Credentials; import org.eclipse.jetty.http.HttpVersion; @@ -104,8 +105,8 @@ public static void main(String[] args) throws InterruptedException { // REST api setupRestApiContextHandler(webApp, conf); - // Notebook server - setupNotebookServer(webApp, conf); + // Main Notebook WS server + notebookWsServer = setupNotebookServer(webApp, conf); //Below is commented since zeppelin-docs module is removed. //final WebAppContext webAppSwagg = setupWebAppSwagger(conf); @@ -119,6 +120,9 @@ public static void main(String[] args) throws InterruptedException { } LOG.info("Done, zeppelin server started"); + // register observer for job manager. + notebook.getNotebookEventObserver().addObserver(notebookWsServer); + Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); @@ -192,17 +196,16 @@ private static Server setupJettyServer(ZeppelinConfiguration conf) { return server; } - private static void setupNotebookServer(WebAppContext webapp, - ZeppelinConfiguration conf) { - notebookWsServer = new NotebookServer(); + private static NotebookServer setupNotebookServer(WebAppContext webapp, + ZeppelinConfiguration conf) { + NotebookServer notebookWsServer = new NotebookServer(); String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize(); final ServletHolder servletHolder = new ServletHolder(notebookWsServer); servletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize); - final ServletContextHandler cxfContext = new ServletContextHandler( - ServletContextHandler.SESSIONS); - + ServletContextHandler.SESSIONS); webapp.addServlet(servletHolder, "/ws/*"); + return notebookWsServer; } private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/AppMainServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/AppMainServer.java new file mode 100644 index 00000000000..714033b0b64 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/AppMainServer.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.servlet.http.HttpServletRequest; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.socket.Message; +import org.apache.zeppelin.ticket.TicketContainer; +import org.apache.zeppelin.utils.SecurityUtils; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Zeppelin websocket service. + */ +public class AppMainServer extends WebSocketServlet implements + WebSocketListener, WebSocketServer { + private static final Logger LOG = LoggerFactory.getLogger(AppMainServer.class); + Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); + + final Map subWebSocketServer = new HashMap<>(); + + final Map> userWebSocketMap = new HashMap<>(); + final Queue connectedSockets = new ConcurrentLinkedQueue<>(); + + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator(new WebAppSocketCreator(this)); + } + + public boolean checkOrigin(HttpServletRequest request, String origin) { + try { + return SecurityUtils.isValidOrigin(origin, ZeppelinConfiguration.create()); + } catch (UnknownHostException e) { + LOG.error(e.toString(), e); + } catch (URISyntaxException e) { + LOG.error(e.toString(), e); + } + return false; + } + + public WebAppSocket doWebSocketConnect(HttpServletRequest req, String protocol) { + return new WebAppSocket(req, protocol, this); + } + + public void setSubWebSocketServer(String key, WebSocketServer server) { + synchronized (subWebSocketServer) { + subWebSocketServer.remove(key); + subWebSocketServer.put(key, server); + } + } + + @Override + public void onOpen(WebAppSocket conn) { + LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), + conn.getRequest().getRemotePort()); + connectedSockets.add(conn); + } + + @Override + public void onMessage(WebAppSocket conn, String msg) { + try { + Message messagereceived = deserializeMessage(msg); + LOG.debug("RECEIVE << " + messagereceived.op); + LOG.debug("RECEIVE PRINCIPAL << " + messagereceived.principal); + LOG.debug("RECEIVE TICKET << " + messagereceived.ticket); + LOG.debug("RECEIVE ROLES << " + messagereceived.roles); + + if (LOG.isTraceEnabled()) { + LOG.trace("RECEIVE MSG = " + messagereceived); + } + + WebSocketServer processServer = subWebSocketServer.get(messagereceived.target); + if (processServer != null) { + LOG.debug("server {} received.", messagereceived.target); + processServer.onMessage(conn, msg); + } else { + LOG.debug("server {} received.", AppMainServer.class.toString()); + } + + /** Lets be elegant here */ + switch (messagereceived.op) { + case PING: + break; //do nothing + default: + break; + } + } catch (Exception e) { + LOG.error("Can't handle message", e); + } + } + + @Override + public void onClose(WebAppSocket conn, int code, String reason) { + LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); + removeConnectionFromAllKey(conn); + connectedSockets.remove(conn); + } + + protected Message deserializeMessage(String msg) { + return gson.fromJson(msg, Message.class); + } + + protected String serializeMessage(Message m) { + return gson.toJson(m); + } + + protected void addConnectionToKey(String key, WebAppSocket socket) { + synchronized (userWebSocketMap) { + removeConnectionFromAllKey(socket); // make sure a socket relates only a + // single key. + List socketList = userWebSocketMap.get(key); + if (socketList == null) { + socketList = new LinkedList<>(); + userWebSocketMap.put(key, socketList); + } + if (!socketList.contains(socket)) { + socketList.add(socket); + } + } + } + + protected void removeConnectionFromKey(String key, WebAppSocket socket) { + synchronized (userWebSocketMap) { + List socketList = userWebSocketMap.get(key); + if (socketList != null) { + socketList.remove(socket); + } + } + } + + protected void removeKey(String key) { + synchronized (userWebSocketMap) { + List socketList = userWebSocketMap.remove(key); + } + } + + protected void removeConnectionFromAllKey(WebAppSocket socket) { + synchronized (userWebSocketMap) { + Set keys = userWebSocketMap.keySet(); + for (String keyValue : keys) { + removeConnectionFromKey(keyValue, socket); + } + } + } + + protected String getOpenKey(WebAppSocket socket) { + String key = null; + synchronized (userWebSocketMap) { + Set keys = userWebSocketMap.keySet(); + for (String keyValue : keys) { + List sockets = userWebSocketMap.get(keyValue); + if (sockets.contains(socket)) { + key = keyValue; + } + } + } + + return key; + } + + protected void broadcast(String key, Message m) { + synchronized (userWebSocketMap) { + List socketLists = userWebSocketMap.get(key); + if (socketLists == null || socketLists.size() == 0) { + return; + } + LOG.debug("SEND >> " + m.op); + for (WebAppSocket conn : socketLists) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + removeConnectionFromAllKey(conn); + } + } + } + } + + protected void broadcastExcept(String key, Message m, WebAppSocket exclude) { + synchronized (userWebSocketMap) { + List socketLists = userWebSocketMap.get(key); + if (socketLists == null || socketLists.size() == 0) { + return; + } + LOG.debug("SEND >> " + m.op); + for (WebAppSocket conn : socketLists) { + if (exclude.equals(conn)) { + continue; + } + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } + } + } + } + + protected void broadcastAll(Message m) { + for (WebAppSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } + } + } + + protected void unicast(Message m, WebAppSocket conn) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } + } +} + diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 7d50809d5ac..b456da0e208 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -16,38 +16,30 @@ */ package org.apache.zeppelin.socket; -import java.io.IOException; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; - -import javax.servlet.http.HttpServletRequest; - import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; - import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; -import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; +import org.apache.zeppelin.notebook.NotebookEventObserver.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.ticket.TicketContainer; +import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.SecurityUtils; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; @@ -55,51 +47,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Zeppelin websocket service. */ -public class NotebookServer extends WebSocketServlet implements - NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, - RemoteInterpreterProcessListener { +public class NotebookServer extends AppMainServer implements JobListenerFactory, + AngularObjectRegistryListener, RemoteInterpreterProcessListener, Observer{ private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - Gson gson = new GsonBuilder() - .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); - final Map> noteSocketMap = new HashMap<>(); - final Queue connectedSockets = new ConcurrentLinkedQueue<>(); - - private Notebook notebook() { - return ZeppelinServer.notebook; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.setCreator(new NotebookWebSocketCreator(this)); - } - public boolean checkOrigin(HttpServletRequest request, String origin) { - try { - return SecurityUtils.isValidOrigin(origin, ZeppelinConfiguration.create()); - } catch (UnknownHostException e) { - LOG.error(e.toString(), e); - } catch (URISyntaxException e) { - LOG.error(e.toString(), e); + /** + * Job manager service type + */ + protected enum JOB_MANAGER_SERVICE { + JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); + private String serviceTypeKey; + JOB_MANAGER_SERVICE(String serviceType) { + this.serviceTypeKey = serviceType; + } + String getKey() { + return this.serviceTypeKey; } - return false; } - public NotebookSocket doWebSocketConnect(HttpServletRequest req, String protocol) { - return new NotebookSocket(req, protocol, this); - } + final Queue connectedSockets = new ConcurrentLinkedQueue<>(); - @Override - public void onOpen(NotebookSocket conn) { - LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort()); - connectedSockets.add(conn); + private Notebook notebook() { + return ZeppelinServer.notebook; } @Override - public void onMessage(NotebookSocket conn, String msg) { + public void onMessage(WebAppSocket conn, String msg) { Notebook notebook = notebook(); try { Message messagereceived = deserializeMessage(msg); @@ -111,14 +94,14 @@ public void onMessage(NotebookSocket conn, String msg) { if (LOG.isTraceEnabled()) { LOG.trace("RECEIVE MSG = " + messagereceived); } - + String ticket = TicketContainer.instance.getTicket(messagereceived.principal); if (ticket != null && !ticket.equals(messagereceived.ticket)) throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket); ZeppelinConfiguration conf = ZeppelinConfiguration.create(); boolean allowAnonymous = conf. - getBoolean(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED); + getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED); if (!allowAnonymous && messagereceived.principal.equals("anonymous")) { throw new Exception("Anonymous access not allowed "); } @@ -132,7 +115,6 @@ public void onMessage(NotebookSocket conn, String msg) { userAndRoles.addAll(roles); } } - /** Lets be elegant here */ switch (messagereceived.op) { case LIST_NOTES: @@ -203,6 +185,15 @@ public void onMessage(NotebookSocket conn, String msg) { case CHECKPOINT_NOTEBOOK: checkpointNotebook(conn, notebook, messagereceived); break; + case LIST_NOTEBOOK_JOBS: + sendNotebookJobInfo(conn); + break; + case LIST_UPDATE_NOTEBOOK_JOBS: + sendUpdateNotebookJobInfo(conn, messagereceived); + break; + case UNSUBSCRIBE_JOBMANAGER: + unsubscribeJobManager(conn); + break; default: break; } @@ -211,78 +202,27 @@ public void onMessage(NotebookSocket conn, String msg) { } } - @Override - public void onClose(NotebookSocket conn, int code, String reason) { - LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() - .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); - removeConnectionFromAllNote(conn); - connectedSockets.remove(conn); - } - - protected Message deserializeMessage(String msg) { - return gson.fromJson(msg, Message.class); - } - - protected String serializeMessage(Message m) { - return gson.toJson(m); - } - - private void addConnectionToNote(String noteId, NotebookSocket socket) { - synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a - // single note. - List socketList = noteSocketMap.get(noteId); - if (socketList == null) { - socketList = new LinkedList<>(); - noteSocketMap.put(noteId, socketList); - } - if (!socketList.contains(socket)) { - socketList.add(socket); - } - } + private void addConnectionToNote(String noteId, WebAppSocket socket) { + addConnectionToKey(noteId, socket); } - private void removeConnectionFromNote(String noteId, NotebookSocket socket) { - synchronized (noteSocketMap) { - List socketList = noteSocketMap.get(noteId); - if (socketList != null) { - socketList.remove(socket); - } - } + private void removeConnectionFromNote(String noteId, WebAppSocket socket) { + removeConnectionFromKey(noteId, socket); } private void removeNote(String noteId) { - synchronized (noteSocketMap) { - List socketList = noteSocketMap.remove(noteId); - } + removeKey(noteId); } - private void removeConnectionFromAllNote(NotebookSocket socket) { - synchronized (noteSocketMap) { - Set keys = noteSocketMap.keySet(); - for (String noteId : keys) { - removeConnectionFromNote(noteId, socket); - } - } + private void removeConnectionFromAllNote(WebAppSocket socket) { + removeConnectionFromAllKey(socket); } - private String getOpenNoteId(NotebookSocket socket) { - String id = null; - synchronized (noteSocketMap) { - Set keys = noteSocketMap.keySet(); - for (String noteId : keys) { - List sockets = noteSocketMap.get(noteId); - if (sockets.contains(socket)) { - id = noteId; - } - } - } - - return id; + private String getOpenNoteId(WebAppSocket socket) { + return getOpenKey(socket); } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, - Message m) { + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -295,61 +235,6 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, } } - private void broadcast(String noteId, Message m) { - synchronized (noteSocketMap) { - List socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { - return; - } - LOG.debug("SEND >> " + m.op); - for (NotebookSocket conn : socketLists) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - } - } - - private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { - synchronized (noteSocketMap) { - List socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { - return; - } - LOG.debug("SEND >> " + m.op); - for (NotebookSocket conn : socketLists) { - if (exclude.equals(conn)) { - continue; - } - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - } - } - - private void broadcastAll(Message m) { - for (NotebookSocket conn : connectedSockets) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - } - - private void unicast(Message m, NotebookSocket conn) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - public List> generateNotebooksInfo(boolean needsReload) { Notebook notebook = notebook(); @@ -392,7 +277,7 @@ public void broadcastNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - public void unicastNoteList(NotebookSocket conn) { + public void unicastNoteList(WebAppSocket conn) { List> notesInfo = generateNotebooksInfo(false); unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); } @@ -402,7 +287,370 @@ public void broadcastReloadedNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - void permissionError(NotebookSocket conn, String op, Set userAndRoles, + public void unsubscribeJobManager(WebAppSocket conn) { + LOG.info("unsubscribe"); + removeConnectionFromAllKey(conn); + } + + public void sendNotebookJobInfo(WebAppSocket conn) throws IOException { + + addConnectionToKey(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn); + List> notebookJobs = generateNotebooksJobInfo(false); + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + conn.send(serializeMessage(new Message(OP.LIST_NOTEBOOK_JOBS) + .put("notebookJobs", response))); + } + + public void sendUpdateNotebookJobInfo(WebAppSocket conn, Message fromMessage) + throws IOException { + double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime"); + long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue(); + List> notebookJobs; + notebookJobs = generateUpdateNotebooksJobInfo(false, lastUpdateUnixTime); + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS) + .put("notebookRunningJobs", response))); + } + + private Map getParagraphPacketItem(Paragraph paragraph) { + Map paragraphItem = new HashMap<>(); + + // set paragraph id + paragraphItem.put("id", paragraph.getId()); + + // set paragraph name + String paragraphName = paragraph.getTitle(); + if (paragraphName != null) { + paragraphItem.put("name", paragraphName); + } else { + paragraphItem.put("name", paragraph.getId()); + } + + // set status for paragraph. + paragraphItem.put("status", paragraph.getStatus().toString()); + + return paragraphItem; + } + + private long getUnixTimeLastRunParagraph(Paragraph paragraph) { + + Date lastRunningDate = null; + long lastRunningUnixTime = 0; + + Date paragaraphDate = paragraph.getDateStarted(); + if (paragaraphDate == null) { + paragaraphDate = paragraph.getDateCreated(); + } + + // set last update unixtime(ms). + if (lastRunningDate == null) { + lastRunningDate = paragaraphDate; + } else { + if (lastRunningDate.after(paragaraphDate) == true) { + lastRunningDate = paragaraphDate; + } + } + + lastRunningUnixTime = lastRunningDate.getTime(); + + return lastRunningUnixTime; + } + + public List> generateNotebooksJobInfo(boolean needsReload) { + Notebook notebook = notebook(); + + ZeppelinConfiguration conf = notebook.getConf(); + String homescreenNotebookId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); + boolean hideHomeScreenNotebookFromList = conf + .getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); + + if (needsReload) { + try { + notebook.reloadAllNotes(); + } catch (IOException e) { + LOG.error("Fail to reload notes from repository"); + } + } + + List notes = notebook.getAllNotes(); + List> notesInfo = new LinkedList<>(); + for (Note note : notes) { + boolean isNotebookRunning = false; + Map info = new HashMap<>(); + + if (hideHomeScreenNotebookFromList && note.id().equals(homescreenNotebookId)) { + continue; + } + + String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; + info.put("notebookId", note.id()); + String notebookName = note.getName(); + if (notebookName != null) { + info.put("notebookName", note.getName()); + } else { + info.put("notebookName", "Note " + note.id()); + } + + if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true + && !note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) { + info.put("notebookType", "cron"); + } + else { + info.put("notebookType", "normal"); + } + + long lastRunningUnixTime = 0; + + List> paragraphsInfo = new LinkedList<>(); + for (Paragraph paragraph : note.getParagraphs()) { + if (paragraph.getStatus().isRunning() == true) { + isNotebookRunning = true; + } + + Map paragraphItem = getParagraphPacketItem(paragraph); + + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + + paragraphsInfo.add(paragraphItem); + } + + // Interpreter is set does not exist. + String interpreterGroupName = null; + if (note.getNoteReplLoader().getInterpreterSettings() != null + && note.getNoteReplLoader().getInterpreterSettings().size() >= 1) { + interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup(); + } + + // notebook json object root information. + info.put("interpreter", interpreterGroupName); + info.put("isRunningJob", isNotebookRunning); + info.put("unixTimeLastRun", lastRunningUnixTime); + info.put("paragraphs", paragraphsInfo); + notesInfo.add(info); + } + return notesInfo; + } + + + public List> generateUpdateNotebooksJobInfo( + boolean needsReload, long lastUpdateServerUnixTime) { + Notebook notebook = notebook(); + + ZeppelinConfiguration conf = notebook.getConf(); + String homescreenNotebookId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); + boolean hideHomeScreenNotebookFromList = conf + .getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); + + if (needsReload) { + try { + notebook.reloadAllNotes(); + } catch (IOException e) { + LOG.error("Fail to reload notes from repository"); + } + } + + List notes = notebook.getAllNotes(); + List> notesInfo = new LinkedList<>(); + for (Note note : notes) { + boolean isNotebookRunning = false; + boolean isUpdateNotebook = false; + + Map info = new HashMap<>(); + + if (hideHomeScreenNotebookFromList && note.id().equals(homescreenNotebookId)) { + continue; + } + + // set const keyword for cron type + String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; + info.put("notebookId", note.id()); + String notebookName = note.getName(); + if (notebookName != null) { + info.put("notebookName", note.getName()); + } else { + info.put("notebookName", note.id()); + } + + if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true + && !note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) { + info.put("notebookType", "cron"); + } + else { + info.put("notebookType", "normal"); + } + + long lastRunningUnixTime = 0; + + List> paragraphsInfo = new LinkedList<>(); + for (Paragraph paragraph : note.getParagraphs()) { + + // check date for update time. + Date startedDate = paragraph.getDateStarted(); + Date createdDate = paragraph.getDateCreated(); + Date finishedDate = paragraph.getDateFinished(); + + if (startedDate != null && startedDate.getTime() > lastUpdateServerUnixTime) { + isUpdateNotebook = true; + } + if (createdDate != null && createdDate.getTime() > lastUpdateServerUnixTime) { + isUpdateNotebook = true; + } + if (finishedDate != null && finishedDate.getTime() > lastUpdateServerUnixTime) { + isUpdateNotebook = true; + } + + Map paragraphItem = getParagraphPacketItem(paragraph); + + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + + if (paragraph.getStatus().isRunning() == true) { + isNotebookRunning = true; + isUpdateNotebook = true; + } + paragraphsInfo.add(paragraphItem); + } + + // Insert only data that has changed. + if (isUpdateNotebook != true) { + continue; + } + + // Interpreter is set does not exist. + String interpreterGroupName = null; + if (note.getNoteReplLoader().getInterpreterSettings() != null + && note.getNoteReplLoader().getInterpreterSettings().size() >= 1) { + interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup(); + } + + // set notebook root information. + info.put("interpreter", interpreterGroupName); + info.put("isRunningJob", isNotebookRunning); + info.put("unixTimeLastRun", lastRunningUnixTime); + info.put("paragraphs", paragraphsInfo); + notesInfo.add(info); + } + return notesInfo; + } + + public boolean broadUpdateNote(String noteId) { + + Note note = notebook().getNote(noteId); + + if (note == null) { + LOG.info("broadUpdateNote - not found note"); + return false; + } + + List> notesList = new LinkedList<>(); + + Map noteItem = new HashMap<>(); + + noteItem.put("notebookId", note.id()); + String notebookName = note.getName(); + if (notebookName != null) { + noteItem.put("notebookName", note.getName()); + } else { + noteItem.put("notebookName", note.id()); + } + + // set const keyword for cron type + String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; + if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true + && !note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) { + noteItem.put("notebookType", "cron"); + } + else { + noteItem.put("notebookType", "normal"); + } + + + long lastRunningUnixTime = 0; + boolean isNotebookRunning = false; + List> paragraphsInfo = new LinkedList<>(); + for (Paragraph paragraph : note.getParagraphs()) { + Map paragraphItem = getParagraphPacketItem(paragraph); + + if (paragraph.getStatus().isRunning() == true) { + isNotebookRunning = true; + } + + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + + paragraphsInfo.add(paragraphItem); + } + + // Interpreter is set does not exist. + String interpreterGroupName = null; + if (note.getNoteReplLoader().getInterpreterSettings() != null + && note.getNoteReplLoader().getInterpreterSettings().size() >= 1) { + interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup(); + } + + // set notebook root information. + noteItem.put("interpreter", interpreterGroupName); + noteItem.put("isRunningJob", isNotebookRunning); + noteItem.put("unixTimeLastRun", lastRunningUnixTime); + noteItem.put("paragraphs", paragraphsInfo); + notesList.add(noteItem); + + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesList); + + broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + + return true; + } + + public boolean broadRemovedNote(String noteId) { + + List> notesList = new LinkedList<>(); + Map noteItem = new HashMap<>(); + + // set notebook root information. + noteItem.put("notebookId", noteId); + noteItem.put("isRemoved", true); + notesList.add(noteItem); + + Map response = new HashMap<>(); + + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesList); + + broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + + return true; + } + + /** + * Notebook Observer Event Listener + */ + @Override + public void update(Observable observer, Object notebookChnagedEvent) { + NotebookChnagedEvent noteEvent = (NotebookChnagedEvent) notebookChnagedEvent; + try { + if (noteEvent.getAction() == NotebookEventObserver.ACTIONS.REMOVED) { + broadRemovedNote(noteEvent.getNoteId()); + } else { + broadUpdateNote(noteEvent.getNoteId()); + } + } catch (Exception e) { + LOG.info("socket error job {}", e.getMessage()); + } + } + + void permissionError(WebAppSocket conn, String op, Set userAndRoles, Set allowed) throws IOException { LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed); @@ -415,7 +663,7 @@ void permissionError(NotebookSocket conn, String op, Set userAndRoles, "But the user " + userName + " belongs to: " + userAndRoles.toString()))); } - private void sendNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + private void sendNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(), @@ -441,7 +689,7 @@ private void sendNote(NotebookSocket conn, HashSet userAndRoles, Noteboo } } - private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, + private void sendHomeNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook) throws IOException { String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); @@ -465,7 +713,7 @@ private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, } } - private void updateNote(NotebookSocket conn, HashSet userAndRoles, + private void updateNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); @@ -514,7 +762,7 @@ private boolean isCronUpdated(Map configA, return cronUpdated; } - private void createNote(NotebookSocket conn, HashSet userAndRoles, + private void createNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message message) throws IOException { Note note = notebook.createNote(); @@ -528,12 +776,12 @@ private void createNote(NotebookSocket conn, HashSet userAndRoles, } note.persist(); - addConnectionToNote(note.id(), (NotebookSocket) conn); + addConnectionToNote(note.id(), (WebAppSocket) conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); broadcastNoteList(); } - private void removeNote(NotebookSocket conn, HashSet userAndRoles, + private void removeNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); @@ -553,7 +801,7 @@ private void removeNote(NotebookSocket conn, HashSet userAndRoles, broadcastNoteList(); } - private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, + private void updateParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -581,18 +829,18 @@ private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } - private void cloneNote(NotebookSocket conn, HashSet userAndRoles, + private void cloneNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException, CloneNotSupportedException { String noteId = getOpenNoteId(conn); String name = (String) fromMessage.get("name"); Note newNote = notebook.cloneNote(noteId, name); - addConnectionToNote(newNote.id(), (NotebookSocket) conn); + addConnectionToNote(newNote.id(), (WebAppSocket) conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); broadcastNoteList(); } - protected Note importNote(NotebookSocket conn, HashSet userAndRoles, + protected Note importNote(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { Note note = null; @@ -607,7 +855,7 @@ protected Note importNote(NotebookSocket conn, HashSet userAndRoles, return note; } - private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, + private void removeParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -629,7 +877,7 @@ private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, } } - private void clearParagraphOutput(NotebookSocket conn, HashSet userAndRoles, + private void clearParagraphOutput(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -647,7 +895,7 @@ private void clearParagraphOutput(NotebookSocket conn, HashSet userAndRo broadcastNote(note); } - private void completion(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + private void completion(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); @@ -671,7 +919,7 @@ private void completion(NotebookSocket conn, HashSet userAndRoles, Noteb * @param notebook the notebook. * @param fromMessage the message. */ - private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRoles, + private void angularObjectUpdated(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String paragraphId = (String) fromMessage.get("paragraphId"); @@ -763,7 +1011,7 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRo * @param fromMessage * @throws Exception */ - protected void angularObjectClientBind(NotebookSocket conn, HashSet userAndRoles, + protected void angularObjectClientBind(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); @@ -804,7 +1052,7 @@ protected void angularObjectClientBind(NotebookSocket conn, HashSet user * @param fromMessage * @throws Exception */ - protected void angularObjectClientUnbind(NotebookSocket conn, HashSet userAndRoles, + protected void angularObjectClientUnbind(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws Exception{ String noteId = fromMessage.getType("noteId"); @@ -845,7 +1093,7 @@ private InterpreterGroup findInterpreterGroupForParagraph(Note note, String para private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName, Object varValue, RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, NotebookSocket conn) { + String interpreterGroupId, WebAppSocket conn) { final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); @@ -861,7 +1109,7 @@ private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName, RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, NotebookSocket conn) { + String interpreterGroupId, WebAppSocket conn) { final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); this.broadcastExcept( @@ -875,7 +1123,7 @@ private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName, Object varValue, AngularObjectRegistry registry, - String interpreterGroupId, NotebookSocket conn) { + String interpreterGroupId, WebAppSocket conn) { AngularObject angularObject = registry.get(varName, noteId, paragraphId); if (angularObject == null) { angularObject = registry.add(varName, varValue, noteId, paragraphId); @@ -893,7 +1141,7 @@ private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, Str } private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName, - AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { + AngularObjectRegistry registry, String interpreterGroupId, WebAppSocket conn) { final AngularObject removed = registry.remove(varName, noteId, paragraphId); if (removed != null) { this.broadcastExcept( @@ -906,7 +1154,7 @@ private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, } } - private void moveParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + private void moveParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -928,7 +1176,7 @@ private void moveParagraph(NotebookSocket conn, HashSet userAndRoles, No broadcastNote(note); } - private void insertParagraph(NotebookSocket conn, HashSet userAndRoles, + private void insertParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index") .toString()); @@ -945,7 +1193,7 @@ private void insertParagraph(NotebookSocket conn, HashSet userAndRoles, broadcastNote(note); } - private void cancelParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + private void cancelParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -964,7 +1212,7 @@ private void cancelParagraph(NotebookSocket conn, HashSet userAndRoles, p.abort(); } - private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + private void runParagraph(WebAppSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { @@ -1020,7 +1268,7 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not } } - private void sendAllConfigurations(NotebookSocket conn, HashSet userAndRoles, + private void sendAllConfigurations(WebAppSocket conn, HashSet userAndRoles, Notebook notebook) throws IOException { ZeppelinConfiguration conf = notebook.getConf(); @@ -1029,8 +1277,8 @@ private void sendAllConfigurations(NotebookSocket conn, HashSet userAndR @Override public boolean apply(String key) { return !key.contains("password") && - !key.equals(ZeppelinConfiguration - .ConfVars + !key.equals( + ConfVars .ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING .getVarName()); } @@ -1040,7 +1288,7 @@ public boolean apply(String key) { .put("configurations", configurations))); } - private void checkpointNotebook(NotebookSocket conn, Notebook notebook, + private void checkpointNotebook(WebAppSocket conn, Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String commitMessage = (String) fromMessage.get("commitMessage"); @@ -1084,17 +1332,17 @@ public void onOutputUpdated(String noteId, String paragraphId, String output) { * */ public static class ParagraphListenerImpl implements ParagraphJobListener { - private NotebookServer notebookServer; + private NotebookServer appMainServer; private Note note; - public ParagraphListenerImpl(NotebookServer notebookServer, Note note) { - this.notebookServer = notebookServer; + public ParagraphListenerImpl(NotebookServer appMainServer, Note note) { + this.appMainServer = appMainServer; this.note = note; } @Override public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast( + appMainServer.broadcast( note.id(), new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); @@ -1120,7 +1368,7 @@ public void afterStatusChange(Job job, Status before, Status after) { LOG.error(e.toString(), e); } } - notebookServer.broadcastNote(note); + appMainServer.broadcastNote(note); } /** @@ -1136,7 +1384,7 @@ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String ou .put("paragraphId", paragraph.getId()) .put("data", output); - notebookServer.broadcast(paragraph.getNote().getId(), msg); + appMainServer.broadcast(paragraph.getNote().getId(), msg); } /** @@ -1152,7 +1400,7 @@ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String ou .put("paragraphId", paragraph.getId()) .put("data", output); - notebookServer.broadcast(paragraph.getNote().getId(), msg); + appMainServer.broadcast(paragraph.getNote().getId(), msg); } } @@ -1161,7 +1409,7 @@ public ParagraphJobListener getParagraphJobListener(Note note) { return new ParagraphListenerImpl(this, note); } - private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { + private void sendAllAngularObjects(Note note, WebAppSocket conn) throws IOException { List settings = note.getNoteReplLoader() .getInterpreterSettings(); if (settings == null || settings.size() == 0) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocket.java similarity index 90% rename from zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java rename to zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocket.java index 5d68bf5ec2d..b875bb529ba 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocket.java @@ -26,15 +26,15 @@ /** * Notebook websocket */ -public class NotebookSocket extends WebSocketAdapter { +public class WebAppSocket extends WebSocketAdapter { private Session connection; - private NotebookSocketListener listener; + private WebSocketListener listener; private HttpServletRequest request; private String protocol; - public NotebookSocket(HttpServletRequest req, String protocol, - NotebookSocketListener listener) { + public WebAppSocket(HttpServletRequest req, String protocol, + WebSocketListener listener) { this.listener = listener; this.request = req; this.protocol = protocol; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocketCreator.java similarity index 75% rename from zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java rename to zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocketCreator.java index 1b8e2f44e02..cf7104e173b 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebAppSocketCreator.java @@ -21,16 +21,16 @@ import org.eclipse.jetty.websocket.servlet.WebSocketCreator; /** - * Responsible to create the WebSockets for the NotebookServer. + * Responsible to create the WebSockets for the AppMainServer. */ -public class NotebookWebSocketCreator implements WebSocketCreator { - private NotebookServer notebookServer; +public class WebAppSocketCreator implements WebSocketCreator { + private AppMainServer appMainServer; - public NotebookWebSocketCreator(NotebookServer notebookServer) { - this.notebookServer = notebookServer; + public WebAppSocketCreator(AppMainServer appMainServer) { + this.appMainServer = appMainServer; } public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) { - return new NotebookSocket(request.getHttpServletRequest(), "", notebookServer); + return new WebAppSocket(request.getHttpServletRequest(), "", appMainServer); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketListener.java similarity index 78% rename from zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java rename to zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketListener.java index 6fc073402b4..e517262d4c8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketListener.java @@ -17,10 +17,10 @@ package org.apache.zeppelin.socket; /** - * NoteboookSocket listener + * WebSocket listener */ -public interface NotebookSocketListener { - void onClose(NotebookSocket socket, int code, String message); - void onOpen(NotebookSocket socket); - void onMessage(NotebookSocket socket, String message); +public interface WebSocketListener { + void onClose(WebAppSocket socket, int code, String message); + void onOpen(WebAppSocket socket); + void onMessage(WebAppSocket socket, String message); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketServer.java new file mode 100644 index 00000000000..b67c001e2d5 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/WebSocketServer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +/** + * WebSocket Server Interface + */ +public interface WebSocketServer { + void onMessage(WebAppSocket conn, String msg); +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index b904b6849e0..43d947d89c7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -110,8 +110,8 @@ public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() thr interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null); // create two sockets and open it - NotebookSocket sock1 = createWebSocket(); - NotebookSocket sock2 = createWebSocket(); + WebAppSocket sock1 = createWebSocket(); + WebAppSocket sock2 = createWebSocket(); assertEquals(sock1, sock1); assertNotEquals(sock1, sock2); @@ -196,8 +196,8 @@ public void should_bind_angular_object_to_remote_for_paragraphs() throws Excepti when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId")).thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); + WebAppSocket conn = mock(WebAppSocket.class); + WebAppSocket otherConn = mock(WebAppSocket.class); final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) .put("angularObject", ao1) @@ -205,7 +205,7 @@ public void should_bind_angular_object_to_remote_for_paragraphs() throws Excepti .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.userWebSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientBind(conn, new HashSet(), notebook, messageReceived); @@ -245,8 +245,8 @@ public void should_bind_angular_object_to_local_for_paragraphs() throws Exceptio when(mdRegistry.add(varName, value, "noteId", "paragraphId")).thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); + WebAppSocket conn = mock(WebAppSocket.class); + WebAppSocket otherConn = mock(WebAppSocket.class); final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) .put("angularObject", ao1) @@ -254,7 +254,7 @@ public void should_bind_angular_object_to_local_for_paragraphs() throws Exceptio .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.userWebSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientBind(conn, new HashSet(), notebook, messageReceived); @@ -288,8 +288,8 @@ public void should_unbind_angular_object_from_remote_for_paragraphs() throws Exc final AngularObject ao1 = AngularObjectBuilder.build(varName, value, "noteId", "paragraphId"); when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); + WebAppSocket conn = mock(WebAppSocket.class); + WebAppSocket otherConn = mock(WebAppSocket.class); final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) .put("angularObject", ao1) @@ -297,7 +297,7 @@ public void should_unbind_angular_object_from_remote_for_paragraphs() throws Exc .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.userWebSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientUnbind(conn, new HashSet(), notebook, messageReceived); @@ -336,15 +336,15 @@ public void should_unbind_angular_object_from_local_for_paragraphs() throws Exce when(mdRegistry.remove(varName, "noteId", "paragraphId")).thenReturn(ao1); - NotebookSocket conn = mock(NotebookSocket.class); - NotebookSocket otherConn = mock(NotebookSocket.class); + WebAppSocket conn = mock(WebAppSocket.class); + WebAppSocket otherConn = mock(WebAppSocket.class); final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE) .put("angularObject", ao1) .put("interpreterGroupId", "mdGroup") .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.userWebSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientUnbind(conn, new HashSet(), notebook, messageReceived); @@ -353,8 +353,8 @@ public void should_unbind_angular_object_from_local_for_paragraphs() throws Exce verify(otherConn).send(mdMsg1); } - private NotebookSocket createWebSocket() { - NotebookSocket sock = mock(NotebookSocket.class); + private WebAppSocket createWebSocket() { + WebAppSocket sock = mock(WebAppSocket.class); when(sock.getRequest()).thenReturn(createHttpServletRequest()); return sock; } diff --git a/zeppelin-web/.jshintrc b/zeppelin-web/.jshintrc index 5cfaeddcaf2..8bf2b07df98 100644 --- a/zeppelin-web/.jshintrc +++ b/zeppelin-web/.jshintrc @@ -33,6 +33,6 @@ "d3": false, "BootstrapDialog": false, "Handsontable": false, - "moment": false + "moment" : false } } diff --git a/zeppelin-web/src/app/app.js b/zeppelin-web/src/app/app.js index bc8e52c9a03..b5f6c85a879 100644 --- a/zeppelin-web/src/app/app.js +++ b/zeppelin-web/src/app/app.js @@ -62,6 +62,10 @@ templateUrl: 'app/notebook/notebook.html', controller: 'NotebookCtrl' }) + .when('/jobmanager', { + templateUrl: 'app/jobmanager/jobmanager.html', + controller: 'JobmanagerCtrl' + }) .when('/interpreter', { templateUrl: 'app/interpreter/interpreter.html', controller: 'InterpreterCtrl' @@ -71,8 +75,8 @@ controller: 'CredentialCtrl' }) .when('/configuration', { - templateUrl: 'app/configuration/configuration.html', - controller: 'ConfigurationCtrl' + templateUrl: 'app/configuration/configuration.html', + controller: 'ConfigurationCtrl' }) .when('/search/:searchTerm', { templateUrl: 'app/search/result-list.html', diff --git a/zeppelin-web/src/app/jobmanager/jobmanager.controller.js b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js new file mode 100644 index 00000000000..bf20af18a2c --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js @@ -0,0 +1,211 @@ +/*jshint loopfunc: true, unused:false */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +angular.module('zeppelinWebApp') + .filter('myJob', function() { + + function filterContext(jobItems, filterConfig) { + var FILTER_VALUE_INTERPRETER = filterConfig.FILTER_VALUE_INTERPRETER; + var FILTER_VALUE_NOTEBOOK_NAME = filterConfig.FILTER_VALUE_NOTEBOOK_NAME; + var RUNNING_ALWAYS_TOP = filterConfig.RUNNING_ALWAYS_TOP; + var SORT_BY_ASC = filterConfig.SORT_BY_ASC; + + var filterItems = jobItems; + + if (FILTER_VALUE_INTERPRETER === undefined) { + filterItems = _.filter(filterItems, function (jobItem) { + return jobItem.interpreter === undefined? true : false; + }); + } else if (FILTER_VALUE_INTERPRETER !== '*') { + filterItems = _.where(filterItems, {interpreter : FILTER_VALUE_INTERPRETER}); + } + + if (FILTER_VALUE_NOTEBOOK_NAME !== '') { + filterItems = _.filter(filterItems, function(jobItem){ + var lowerFilterValue = FILTER_VALUE_NOTEBOOK_NAME.toLocaleLowerCase(); + var lowerNotebookName = jobItem.notebookName.toLocaleLowerCase(); + return lowerNotebookName.match(new RegExp('.*'+lowerFilterValue+'.*')); + }); + } + + if (SORT_BY_ASC === true) { + filterItems = _.sortBy(filterItems, function (sortItem) { + return sortItem.notebookName; + }); + } else { + filterItems = _.sortBy(filterItems, function (sortItem) { + return sortItem.notebookName; + }); + filterItems = filterItems.reverse(); + } + + if (RUNNING_ALWAYS_TOP === true) { + var runningJobList = _.where(filterItems, {isRunningJob : true}); + filterItems = _.reject(filterItems, {isRunningJob : true}); + runningJobList.map(function (runningJob) { + filterItems.splice(0,0, runningJob); + }); + } + + return filterItems; + } + return filterContext; + }) + .controller('JobmanagerCtrl', + function($scope, $route, $routeParams, $location, $rootScope, $http, $q, + websocketMsgSrv, baseUrlSrv, $interval, $timeout, SaveAsService, myJobFilter) { + + $scope.$on('setNotebookJobs', function(event, responseData) { + $scope.lastJobServerUnixTime = responseData.lastResponseUnixTime; + $scope.jobInfomations = responseData.jobs; + $scope.jobInfomationsIndexs = $scope.jobInfomations? _.indexBy($scope.jobInfomations, 'notebookId') : {}; + $scope.JobInfomationsByFilter = $scope.jobTypeFilter($scope.jobInfomations, $scope.filterConfig); + $scope.ACTIVE_INTERPRETERS = [ + { + name : 'ALL', + value : '*' + } + ]; + var interpreterLists = _.uniq(_.pluck($scope.jobInfomations, 'interpreter'), false); + for (var index = 0, length = interpreterLists.length; index < length; index++) { + $scope.ACTIVE_INTERPRETERS.push({ + name : interpreterLists[index], + value : interpreterLists[index] + }); + } + $scope.doFiltering($scope.jobInfomations, $scope.filterConfig); + }); + + $scope.$on('setUpdateNotebookJobs', function(event, responseData) { + var jobInfomations = $scope.jobInfomations; + var indexStore = $scope.jobInfomationsIndexs; + $scope.lastJobServerUnixTime = responseData.lastResponseUnixTime; + var notes = responseData.jobs; + notes.map(function (changedItem) { + if (indexStore[changedItem.notebookId] === undefined) { + var newItem = angular.copy(changedItem); + jobInfomations.push(newItem); + indexStore[changedItem.notebookId] = newItem; + } else { + var changeOriginTarget = indexStore[changedItem.notebookId]; + + if (changedItem.isRemoved !== undefined && changedItem.isRemoved === true) { + + // remove Item. + var removeIndex = _.findIndex(indexStore, changedItem.notebookId); + if (removeIndex > -1) { + indexStore.splice(removeIndex, 1); + } + + removeIndex = _.findIndex(jobInfomations, { 'notebookId' : changedItem.notebookId}); + if (removeIndex) { + jobInfomations.splice(removeIndex, 1); + } + + } else { + // change value for item. + changeOriginTarget.isRunningJob = changedItem.isRunningJob; + changeOriginTarget.notebookName = changedItem.notebookName; + changeOriginTarget.notebookType = changedItem.notebookType; + changeOriginTarget.interpreter = changedItem.interpreter; + changeOriginTarget.unixTimeLastRun = changedItem.unixTimeLastRun; + changeOriginTarget.paragraphs = changedItem.paragraphs; + } + } + $scope.doFiltering(jobInfomations, $scope.filterConfig); + }); + }); + + $scope.doFiltering = function (jobInfomations, filterConfig) { + asyncNotebookJobFilter(jobInfomations, filterConfig).then( + function () { + // success + $scope.isLoadingFilter = false; + }, + function (){ + // failed + }); + }; + + $scope.filterValueToName = function (filterValue) { + var index = _.findIndex($scope.ACTIVE_INTERPRETERS, {value : filterValue}); + + if ($scope.ACTIVE_INTERPRETERS[index].name !== undefined) { + return $scope.ACTIVE_INTERPRETERS[index].name; + } else { + return 'undefined'; + } + }; + + $scope.setFilterValue = function (filterValue) { + $scope.filterConfig.FILTER_VALUE_INTERPRETER = filterValue; + $scope.doFiltering($scope.jobInfomations, $scope.filterConfig); + }; + + $scope.onChangeRunJobToAlwaysTopToggle = function () { + $scope.filterConfig.RUNNING_ALWAYS_TOP = !$scope.filterConfig.RUNNING_ALWAYS_TOP; + $scope.doFiltering($scope.jobInfomations, $scope.filterConfig); + }; + + $scope.onChangeSortAsc = function () { + $scope.filterConfig.SORT_BY_ASC = !$scope.filterConfig.SORT_BY_ASC; + $scope.doFiltering($scope.jobInfomations, $scope.filterConfig); + }; + + $scope.doFilterInputTyping = function (keyEvent, jobInfomations, filterConfig) { + var returnKey = 13; + $timeout.cancel($scope.dofilterTimeoutObject); + $scope.dofilterTimeoutObject = $timeout(function(){ + $scope.doFiltering(jobInfomations, filterConfig); + }, 1000); + if (keyEvent.which === returnKey) { + $timeout.cancel($scope.dofilterTimeoutObject); + $scope.doFiltering(jobInfomations, filterConfig); + } + }; + + $scope.init = function () { + $scope.isLoadingFilter = true; + $scope.filterConfig = { + RUNNING_ALWAYS_TOP : true, + FILTER_VALUE_NOTEBOOK_NAME : '', + FILTER_VALUE_INTERPRETER : '*', + SORT_BY_ASC : true + }; + $scope.jobTypeFilter = myJobFilter; + $scope.jobInfomations = []; + $scope.JobInfomationsByFilter = $scope.jobInfomations; + + websocketMsgSrv.getNotebookJobsList(); + var refreshObj = $interval(function () { + if ($scope.lastJobServerUnixTime !== undefined) { + websocketMsgSrv.getUpdateNotebookJobsList($scope.lastJobServerUnixTime); + } + }, 1000); + + $scope.$on('$destroy', function() { + $interval.cancel(refreshObj); + websocketMsgSrv.unsubscribeJobManager(); + }); + }; + + var asyncNotebookJobFilter = function (jobInfomations, filterConfig) { + return $q(function(resolve, reject) { + $scope.JobInfomationsByFilter = $scope.jobTypeFilter(jobInfomations, filterConfig); + resolve($scope.JobInfomationsByFilter); + }); + }; +}); diff --git a/zeppelin-web/src/app/jobmanager/jobmanager.css b/zeppelin-web/src/app/jobmanager/jobmanager.css new file mode 100644 index 00000000000..0e60d568485 --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobmanager.css @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.job-col { + margin: 0; + padding: 0; +} + +.job { + padding: 2px 8px 4px 8px; + min-height: 32px; +} + +.jobManagerHead { + margin: -10px -10px 20px; + padding: 10px 15px 15px 15px; + background: white; + box-shadow: 0 2px 4px rgba(0, 0, 0, 0.15); + border-bottom: 1px solid #E5E5E5; +} + +.jobManagerHead .header { + font-family: 'Roboto', sans-serif; +} + +.job-note-name-query { + padding: 6px; + color: #000; + height: 22px; + width: 200px; + font: normal normal normal 14px/1 FontAwesome; +} diff --git a/zeppelin-web/src/app/jobmanager/jobmanager.html b/zeppelin-web/src/app/jobmanager/jobmanager.html new file mode 100644 index 00000000000..ead19b64224 --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobmanager.html @@ -0,0 +1,168 @@ + + +
+
+
+
+

+ Job +

+
+
+
+
+ You can monitor the written notebook. Check the status of the Notebook and can control the action. +
+
+
+
+
+
+ +
+
+
+ + + + + + + + + +
+
+
+ + + + + {{jobStatus}} + + + + + {{jobStatus}} + + + + + {{jobStatus}} + + + + + {{jobStatus}} + + + + + {{jobStatus}} + + + + + {{jobStatus}} + + +
+
+
+
+
+
+
+ +  Loading... +
+
+
+
+
+
+
+
+ Data does not exist +
+
+
+
diff --git a/zeppelin-web/src/app/jobmanager/jobs/job-control.html b/zeppelin-web/src/app/jobmanager/jobs/job-control.html new file mode 100644 index 00000000000..f8d56df428d --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobs/job-control.html @@ -0,0 +1,43 @@ + + +
+ + {{lastExecuteTime(notebookJob.unixTimeLastRun)}} + + + + RUNNING + + + READY + + + + + {{getProgress()}}% + + + + + + + +
diff --git a/zeppelin-web/src/app/jobmanager/jobs/job-progressBar.html b/zeppelin-web/src/app/jobmanager/jobs/job-progressBar.html new file mode 100644 index 00000000000..11e3c172e10 --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobs/job-progressBar.html @@ -0,0 +1,22 @@ + + +
+
+
+
+
+
diff --git a/zeppelin-web/src/app/jobmanager/jobs/job.controller.js b/zeppelin-web/src/app/jobmanager/jobs/job.controller.js new file mode 100644 index 00000000000..b5c09eb789b --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobs/job.controller.js @@ -0,0 +1,105 @@ +/*jshint loopfunc: true, unused:false */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +angular.module('zeppelinWebApp') + .controller('JobCtrl', function($scope,$rootScope, $http, baseUrlSrv) { + + $scope.init = function (jobInformation) { + $scope.progressValue = 0; + }; + + $scope.getProgress = function () { + var statusList = _.pluck($scope.notebookJob.paragraphs, 'status'); + var runningJob = _.countBy(statusList, function (status) { + if (status === 'FINISHED' || status === 'RUNNING') { + return 'matchCount'; + } else { + return 'none'; + } + }); + var totalCount = statusList.length; + var runningJobCount = runningJob.matchCount; + var result = Math.ceil(runningJobCount / totalCount * 100); + return isNaN(result)? 0 : result; + }; + + $scope.lastExecuteTime = function (unixtime) { + return moment.unix(unixtime/1000).fromNow(); + }; + + $scope.runNotebookJob = function (notebookId) { + BootstrapDialog.confirm({ + closable: true, + title: '', + message: 'Run all paragraphs?', + callback: function(result) { + if (result === true) { + $http({ + method: 'POST', + url: baseUrlSrv.getRestApiBase() + '/notebook/job/' + notebookId, + headers: { + 'Content-Type': 'application/x-www-form-urlencoded' + } + }).then(function successCallback(response) { + // success + }, function errorCallback(errorResponse) { + var errorText = 'SERVER ERROR'; + if (errorResponse.data.message !== undefined) { + errorText = errorResponse.data.message; + } + BootstrapDialog.alert({ + closable: true, + title: 'Execution Failure', + message: errorText + }); + }); + } + } + }); + }; + + $scope.stopNotebookJob = function (notebookId) { + BootstrapDialog.confirm({ + closable: true, + title: '', + message: 'Stop all paragraphs?', + callback: function(result) { + if (result === true) { + $http({ + method: 'DELETE', + url: baseUrlSrv.getRestApiBase() + '/notebook/job/' + notebookId, + headers: { + 'Content-Type': 'application/x-www-form-urlencoded' + } + }).then(function successCallback(response) { + // success + }, function errorCallback(errorResponse) { + var errorText = 'SERVER ERROR'; + if (errorResponse.data.message !== undefined) { + errorText = errorResponse.data.message; + } + BootstrapDialog.alert({ + closable: true, + title: 'Stop Failure', + message: errorText + }); + }); + } + } + }); + }; + +}); diff --git a/zeppelin-web/src/app/jobmanager/jobs/job.css b/zeppelin-web/src/app/jobmanager/jobs/job.css new file mode 100644 index 00000000000..15479a0689f --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobs/job.css @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + job Style +*/ + +.job-space { + margin-bottom: 5px !important; + padding: 10px 10px 10px 10px !important; + min-height: 30px; +} + +.job-margin { + margin-right: 2px; + margin-left: 2px; +} + +.job-types i { + font-weight: bold; + font-size: 10px; +} + + +/* + job Controls CSS +*/ + +.job .runControl { + font-size: 1px; + color: #AAAAAA; + height:4px; + margin: 0px 0px 0px 0px; +} + +.job .runControl .progress { + position: relative; + width: 100%; + height: 4px; + z-index: 100; + border-radius: 0; +} + +.job .control span { + margin-left: 4px; +} + +.job .control { + background: rgba(255,255,255,0.85); + float: right; + color: #999; + margin-top: 1px; + margin-right: 5px; + position: absolute; + clear: both; + right: 15px; + text-align: right; + font-size: 12px; + padding: 4px; +} + +.job .control li { + font-size: 12px; + margin-bottom: 4px; + color: #333333; +} + +.job .control .tooltip { + z-index: 10003; +} + +@-webkit-keyframes spinnerRotateAnimation +{ + from{-webkit-transform:rotate(0deg);} + to{-webkit-transform:rotate(360deg);} +} +@-moz-keyframes spinnerRotateAnimation +{ + from{-moz-transform:rotate(0deg);} + to{-moz-transform:rotate(360deg);} +} +@-ms-keyframes spinnerRotateAnimation +{ + from{-ms-transform:rotate(0deg);} + to{-ms-transform:rotate(360deg);} +} + +.spinAnimation{ + -webkit-animation-name: spinnerRotateAnimation; + -webkit-animation-duration: 1s; + -webkit-animation-iteration-count: infinite; + -webkit-animation-timing-function: linear; + -moz-animation-name: spinnerRotateAnimation; + -moz-animation-duration: 1s; + -moz-animation-iteration-count: infinite; + -moz-animation-timing-function: linear; + -ms-animation-name: spinnerRotateAnimation; + -ms-animation-duration: 1s; + -ms-animation-iteration-count: infinite; + -ms-animation-timing-function: linear; +} + diff --git a/zeppelin-web/src/app/jobmanager/jobs/job.html b/zeppelin-web/src/app/jobmanager/jobs/job.html new file mode 100644 index 00000000000..7bc8f1547fb --- /dev/null +++ b/zeppelin-web/src/app/jobmanager/jobs/job.html @@ -0,0 +1,100 @@ + + + diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js index 4149311c3bd..5eb59fde56e 100644 --- a/zeppelin-web/src/app/notebook/notebook.controller.js +++ b/zeppelin-web/src/app/notebook/notebook.controller.js @@ -43,7 +43,7 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', var connectedOnce = false; - $scope.$on('setConnectedStatus', function(event, param) { + $scope.$on('setWSConnectedStatus', function(event, param) { if(connectedOnce && param){ initNotebook(); } @@ -694,7 +694,7 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', console.log('Error %o %o', status, data.message); BootstrapDialog.show({ closable: true, - title: 'Insufficient privileges', + title: 'Insufficient privileges', message: data.message, buttons: [{ label: 'Login', diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js index f06eef3c4d5..e836212371d 100644 --- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js +++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js @@ -35,6 +35,11 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { return wsProtocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/ws'; }; + this.getJobManagerWebsocketUrl = function() { + var wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + return wsProtocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/ws/job'; + }; + this.getRestApiBase = function() { return location.protocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/api'; }; diff --git a/zeppelin-web/src/components/navbar/navbar.controller.js b/zeppelin-web/src/components/navbar/navbar.controller.js index 50c769f1c3c..07f60dd2d23 100644 --- a/zeppelin-web/src/components/navbar/navbar.controller.js +++ b/zeppelin-web/src/components/navbar/navbar.controller.js @@ -42,7 +42,7 @@ angular.module('zeppelinWebApp').controller('NavCtrl', function($scope, $rootSco notebookListDataFactory.setNotes(notes); }); - $scope.$on('setConnectedStatus', function(event, param) { + $scope.$on('setWSConnectedStatus', function(event, param) { vm.connected = param; }); diff --git a/zeppelin-web/src/components/navbar/navbar.html b/zeppelin-web/src/components/navbar/navbar.html index 5a8e0cd3337..d22000dbbe4 100644 --- a/zeppelin-web/src/components/navbar/navbar.html +++ b/zeppelin-web/src/components/navbar/navbar.html @@ -49,6 +49,9 @@ +
  • + Job +
  • Interpreter
  • diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 3a261a78f74..0ecac157334 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -21,7 +21,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, websocketCalls.ws.onOpen(function() { console.log('Websocket created'); - $rootScope.$broadcast('setConnectedStatus', true); + $rootScope.$broadcast('setWSConnectedStatus', true); setInterval(function(){ websocketCalls.sendNewEvent({op: 'PING'}); }, 10000); @@ -37,7 +37,8 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, data.ticket = ''; data.roles = ''; } - console.log('Send >> %o, %o, %o, %o, %o', data.op, data.principal, data.ticket, data.roles, data); + + console.log('Send Notebook Server >> %o, %o, %o, %o, %o', data.op, data.principal, data.ticket, data.roles, data); websocketCalls.ws.send(JSON.stringify(data)); }; @@ -50,7 +51,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, if (event.data) { payload = angular.fromJson(event.data); } - console.log('Receive << %o, %o', payload.op, payload); + console.log('Receive Notebook Server << %o, %o', payload.op, payload); var op = payload.op; var data = payload.data; if (op === 'NOTE') { @@ -59,10 +60,14 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $location.path('notebook/' + data.note.id); } else if (op === 'NOTES_INFO') { $rootScope.$broadcast('setNoteMenu', data.notes); + } else if (op === 'LIST_NOTEBOOK_JOBS') { + $rootScope.$broadcast('setNotebookJobs', data.notebookJobs); + } else if (op === 'LIST_UPDATE_NOTEBOOK_JOBS') { + $rootScope.$broadcast('setUpdateNotebookJobs', data.notebookRunningJobs); } else if (op === 'AUTH_INFO') { BootstrapDialog.show({ closable: true, - title: 'Insufficient privileges', + title: 'Insufficient privileges', message: data.info.toString(), buttons: [{ label: 'Login', @@ -97,13 +102,13 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, }); websocketCalls.ws.onError(function(event) { - console.log('error message: ', event); - $rootScope.$broadcast('setConnectedStatus', false); + console.log('[notebookWS] Notebook Server message: ', event); + $rootScope.$broadcast('setWSConnectedStatus', false); }); websocketCalls.ws.onClose(function(event) { - console.log('close message: ', event); - $rootScope.$broadcast('setConnectedStatus', false); + console.log('[notebookWS] close message: ', event); + $rootScope.$broadcast('setWSConnectedStatus', false); }); return websocketCalls; diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 3b4df03796c..661a9f10617 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -18,43 +18,43 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, return { getHomeNotebook: function() { - websocketEvents.sendNewEvent({op: 'GET_HOME_NOTE'}); + websocketEvents.sendNewEvent({op: 'GET_HOME_NOTE'}, 'notebookServer'); }, createNotebook: function(noteName) { - websocketEvents.sendNewEvent({op: 'NEW_NOTE',data: {name: noteName}}); + websocketEvents.sendNewEvent({op: 'NEW_NOTE',data: {name: noteName}}, 'notebookServer'); }, deleteNotebook: function(noteId) { - websocketEvents.sendNewEvent({op: 'DEL_NOTE', data: {id: noteId}}); + websocketEvents.sendNewEvent({op: 'DEL_NOTE', data: {id: noteId}}, 'notebookServer'); }, cloneNotebook: function(noteIdToClone, newNoteName ) { - websocketEvents.sendNewEvent({op: 'CLONE_NOTE', data: {id: noteIdToClone, name: newNoteName}}); + websocketEvents.sendNewEvent({op: 'CLONE_NOTE', data: {id: noteIdToClone, name: newNoteName}}, 'notebookServer'); }, getNotebookList: function() { - websocketEvents.sendNewEvent({op: 'LIST_NOTES'}); + websocketEvents.sendNewEvent({op: 'LIST_NOTES'}, 'notebookServer'); }, reloadAllNotesFromRepo: function() { - websocketEvents.sendNewEvent({op: 'RELOAD_NOTES_FROM_REPO'}); + websocketEvents.sendNewEvent({op: 'RELOAD_NOTES_FROM_REPO'}, 'notebookServer'); }, getNotebook: function(noteId) { - websocketEvents.sendNewEvent({op: 'GET_NOTE', data: {id: noteId}}); + websocketEvents.sendNewEvent({op: 'GET_NOTE', data: {id: noteId}}, 'notebookServer'); }, updateNotebook: function(noteId, noteName, noteConfig) { - websocketEvents.sendNewEvent({op: 'NOTE_UPDATE', data: {id: noteId, name: noteName, config : noteConfig}}); + websocketEvents.sendNewEvent({op: 'NOTE_UPDATE', data: {id: noteId, name: noteName, config : noteConfig}}, 'notebookServer'); }, moveParagraph: function(paragraphId, newIndex) { - websocketEvents.sendNewEvent({ op: 'MOVE_PARAGRAPH', data : {id: paragraphId, index: newIndex}}); + websocketEvents.sendNewEvent({ op: 'MOVE_PARAGRAPH', data : {id: paragraphId, index: newIndex}}, 'notebookServer'); }, insertParagraph: function(newIndex) { - websocketEvents.sendNewEvent({ op: 'INSERT_PARAGRAPH', data : {index: newIndex}}); + websocketEvents.sendNewEvent({ op: 'INSERT_PARAGRAPH', data : {index: newIndex}}, 'notebookServer'); }, updateAngularObject: function(noteId, paragraphId, name, value, interpreterGroupId) { @@ -67,7 +67,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, value: value, interpreterGroupId: interpreterGroupId } - }); + }, 'notebookServer'); }, clientBindAngularObject: function(noteId, name, value, paragraphId) { @@ -79,7 +79,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, value: value, paragraphId: paragraphId } - }); + }, 'notebookServer'); }, clientUnbindAngularObject: function(noteId, name, paragraphId) { @@ -90,11 +90,11 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, name: name, paragraphId: paragraphId } - }); + }, 'notebookServer'); }, cancelParagraphRun: function(paragraphId) { - websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}); + websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}, 'notebookServer'); }, runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) { @@ -107,15 +107,15 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, config: paragraphConfig, params: paragraphParams } - }); + }, 'notebookServer'); }, removeParagraph: function(paragraphId) { - websocketEvents.sendNewEvent({op: 'PARAGRAPH_REMOVE', data: {id: paragraphId}}); + websocketEvents.sendNewEvent({op: 'PARAGRAPH_REMOVE', data: {id: paragraphId}}, 'notebookServer'); }, clearParagraphOutput: function(paragraphId) { - websocketEvents.sendNewEvent({op: 'PARAGRAPH_CLEAR_OUTPUT', data: {id: paragraphId}}); + websocketEvents.sendNewEvent({op: 'PARAGRAPH_CLEAR_OUTPUT', data: {id: paragraphId}}, 'notebookServer'); }, completion: function(paragraphId, buf, cursor) { @@ -126,7 +126,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, buf : buf, cursor : cursor } - }); + }, 'notebookServer'); }, commitParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) { @@ -139,7 +139,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, config: paragraphConfig, params: paragraphParams } - }); + }, 'notebookServer'); }, importNotebook: function(notebook) { @@ -148,7 +148,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, data: { notebook: notebook } - }); + }, 'notebookServer'); }, checkpointNotebook: function(noteId, commitMessage) { @@ -158,12 +158,27 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, noteId: noteId, commitMessage: commitMessage } - }); + }, 'notebookServer'); }, isConnected: function(){ return websocketEvents.isConnected(); - } + }, + + getNotebookJobsList: function() { + websocketEvents.sendNewEvent({op: 'LIST_NOTEBOOK_JOBS'}); + }, + + getUpdateNotebookJobsList: function(lastUpdateServerUnixTime) { + websocketEvents.sendNewEvent( + {op: 'LIST_UPDATE_NOTEBOOK_JOBS', data : {lastUpdateUnixTime : lastUpdateServerUnixTime*1}}, + 'jobManagerServer' + ); + }, + + unsubscribeJobManager: function() { + websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_JOBMANAGER'}); + }, }; diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html index 8efce04e506..af856705c64 100644 --- a/zeppelin-web/src/index.html +++ b/zeppelin-web/src/index.html @@ -53,6 +53,8 @@ + + @@ -144,6 +146,8 @@ + + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 6941dc9380c..ddf28a4d395 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -31,6 +31,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.NotebookEventObserver.ACTIONS; import org.apache.zeppelin.notebook.utility.IdHashes; import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; @@ -71,6 +72,7 @@ public class Note implements Serializable, JobListener { private transient NotebookRepo repo; private transient SearchService index; private transient ScheduledFuture delayedPersist; + private transient NotebookEventObserver notebookEventObserver; private transient Credentials credentials; /** @@ -91,15 +93,24 @@ public class Note implements Serializable, JobListener { public Note() {} public Note(NotebookRepo repo, NoteInterpreterLoader replLoader, - JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials) { + JobListenerFactory jlFactory, SearchService noteIndex, + Credentials credentials, + NotebookEventObserver notebookEventObserver) { this.repo = repo; this.replLoader = replLoader; this.jobListenerFactory = jlFactory; this.index = noteIndex; + this.notebookEventObserver = notebookEventObserver; this.credentials = credentials; generateId(); } + private void notifyChanged(ACTIONS event) { + if (notebookEventObserver != null) { + notebookEventObserver.notifyChanged(id, event); + } + } + private void generateId() { id = IdHashes.encode(System.currentTimeMillis() + new Random().nextInt()); } @@ -118,6 +129,15 @@ public String getName() { public void setName(String name) { this.name = name; + notifyChanged(NotebookEventObserver.ACTIONS.CHNAGED_NOTE_NAME); + } + + public NotebookEventObserver getNotebookEventObserver() { + return notebookEventObserver; + } + + public void setNotebookEventObserver(NotebookEventObserver notebookEventObserver) { + this.notebookEventObserver = notebookEventObserver; } public NoteInterpreterLoader getNoteReplLoader() { @@ -126,6 +146,7 @@ public NoteInterpreterLoader getNoteReplLoader() { public void setReplLoader(NoteInterpreterLoader replLoader) { this.replLoader = replLoader; + notifyChanged(NotebookEventObserver.ACTIONS.BIND_INTERPRETER); } public JobListenerFactory getJobListenerFactory() { @@ -171,6 +192,9 @@ public Paragraph addParagraph() { synchronized (paragraphs) { paragraphs.add(p); } + + notifyChanged(NotebookEventObserver.ACTIONS.ADD_PARAGRAPH); + return p; } @@ -202,6 +226,8 @@ public void addCloneParagraph(Paragraph srcParagraph) { synchronized (paragraphs) { paragraphs.add(newParagraph); } + + notifyChanged(NotebookEventObserver.ACTIONS.ADD_PARAGRAPH); } /** @@ -214,6 +240,9 @@ public Paragraph insertParagraph(int index) { synchronized (paragraphs) { paragraphs.add(index, p); } + + notifyChanged(NotebookEventObserver.ACTIONS.ADD_PARAGRAPH); + return p; } @@ -305,6 +334,7 @@ public void moveParagraph(String paragraphId, int index, boolean throwWhenIndexI if (p != null) { paragraphs.add(index, p); + notifyChanged(NotebookEventObserver.ACTIONS.MOVED_PARAGRAPH); } } } @@ -398,6 +428,7 @@ public void run(String paragraphId) { if (p.getConfig().get("enabled") == null || (Boolean) p.getConfig().get("enabled")) { intp.getScheduler().submit(p); } + } public List completion(String paragraphId, String buffer, int cursor) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 4e611119bb8..85f08391450 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -79,6 +79,7 @@ public class Notebook { private NotebookRepo notebookRepo; private SearchService notebookIndex; private NotebookAuthorization notebookAuthorization; + private NotebookEventObserver notebookEventObserver; private Credentials credentials; /** @@ -113,6 +114,7 @@ public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo, quartzSched = quertzSchedFact.getScheduler(); quartzSched.start(); CronJob.notebook = this; + notebookEventObserver = new NotebookEventObserver(); loadAllNotes(); if (this.notebookIndex != null) { @@ -125,6 +127,10 @@ public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo, } + public NotebookEventObserver getNotebookEventObserver() { + return notebookEventObserver; + } + /** * Create new note. * @@ -139,6 +145,7 @@ public Note createNote() throws IOException { note = createNote(null); } notebookIndex.addIndexDoc(note); + notebookEventObserver.notifyChanged(note.id(), NotebookEventObserver.ACTIONS.CREATE); return note; } @@ -150,7 +157,8 @@ public Note createNote() throws IOException { */ public Note createNote(List interpreterIds) throws IOException { NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory); - Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex, credentials); + Note note = new Note(notebookRepo, intpLoader, + jobListenerFactory, notebookIndex, credentials, notebookEventObserver); intpLoader.setNoteId(note.id()); synchronized (notes) { notes.put(note.id(), note); @@ -161,6 +169,7 @@ public Note createNote(List interpreterIds) throws IOException { notebookIndex.addIndexDoc(note); note.persist(); + notebookEventObserver.notifyChanged(note.id(), NotebookEventObserver.ACTIONS.CREATE); return note; } @@ -208,6 +217,7 @@ public Note importNote(String sourceJson, String noteName) throws IOException { } newNote.persist(); + notebookEventObserver.notifyChanged(newNote.id(), NotebookEventObserver.ACTIONS.CREATE); } catch (IOException e) { logger.error(e.toString(), e); throw e; @@ -245,6 +255,7 @@ public Note cloneNote(String sourceNoteID, String newNoteName) throws notebookIndex.addIndexDoc(newNote); newNote.persist(); + notebookEventObserver.notifyChanged(newNote.id(), NotebookEventObserver.ACTIONS.CREATE); return newNote; } @@ -255,6 +266,8 @@ public void bindInterpretersToNote(String id, note.getNoteReplLoader().setInterpreters(interpreterSettingIds); // comment out while note.getNoteReplLoader().setInterpreters(...) do the same // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); + notebookEventObserver.notifyChanged(note.id(), + NotebookEventObserver.ACTIONS.BIND_INTERPRETER); } } @@ -288,6 +301,8 @@ public void removeNote(String id) { synchronized (notes) { note = notes.remove(id); } + notebookEventObserver.notifyChanged(note.id(), NotebookEventObserver.ACTIONS.REMOVED); + replFactory.removeNoteInterpreterSettingBinding(id); notebookIndex.deleteIndexDocs(note); notebookAuthorization.removeNote(id); @@ -347,7 +362,7 @@ private Note loadNoteFromRepo(String id) { note.setJobListenerFactory(jobListenerFactory); note.setNotebookRepo(notebookRepo); - + note.setNotebookEventObserver(notebookEventObserver); Map angularObjectSnapshot = new HashMap<>(); // restore angular object -------------- @@ -500,6 +515,9 @@ public void execute(JobExecutionContext context) throws JobExecutionException { String noteId = context.getJobDetail().getJobDataMap().getString("noteId"); Note note = notebook.getNote(noteId); + + NotebookEventObserver notebookEventObserver = notebook.getNotebookEventObserver(); + note.runAll(); while (!note.getLastParagraph().isTerminated()) { @@ -574,6 +592,7 @@ public void refreshCron(String id) { info.put("cron", "Scheduler Exception"); } } + notebookEventObserver.notifyChanged(id, NotebookEventObserver.ACTIONS.CHANGED_CONFIG); } private void removeCron(String id) { @@ -582,6 +601,7 @@ private void removeCron(String id) { } catch (SchedulerException e) { logger.error("Can't remove quertz " + id, e); } + notebookEventObserver.notifyChanged(id, NotebookEventObserver.ACTIONS.CHANGED_CONFIG); } public InterpreterFactory getInterpreterFactory() { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventObserver.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventObserver.java new file mode 100644 index 00000000000..e1230e48379 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventObserver.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.util.Observable; + +/** + * Notebook event observer + */ +public class NotebookEventObserver extends Observable { + + /** + * Notebook event Enum + */ + public static enum ACTIONS { + REMOVED, CREATE, RUN, + BIND_INTERPRETER, CHANGED_CONFIG, CHNAGED_NOTE_NAME, ADD_PARAGRAPH, MOVED_PARAGRAPH, + RUN_PARAGRAPH + } + + void notifyChanged(String noteId, ACTIONS action) { + setChanged(); + NotebookChnagedEvent event = new NotebookChnagedEvent(noteId, action); + notifyObservers(event); + } + + /** + * Notebook Event Model + */ + public class NotebookChnagedEvent { + private String noteId; + private ACTIONS action; + + public NotebookChnagedEvent(String noteId, ACTIONS action) { + this.noteId = noteId; + this.action = action; + } + + public String getNoteId() { + return noteId; + } + + public void setNoteId(String noteId) { + this.noteId = noteId; + } + + public ACTIONS getAction() { + return action; + } + + public void setAction(ACTIONS action) { + this.action = action; + } + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index a3fc048d61e..9fbca710c9c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -111,13 +111,18 @@ public static enum OP { CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations // @param settings serialized Map object - CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository + CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository // @param noteId // @param checkpointName + LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations + LIST_UPDATE_NOTEBOOK_JOBS, // [c-s] get job management informations for until unixtime + // @param unixTime + UNSUBSCRIBE_JOBMANAGER // [c-s] Unsubscribe notification for job manger. } public OP op; + public String target; public Map data = new HashMap(); public String ticket = "anonymous"; public String principal = "anonymous"; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java index dcb68c82dc8..0109d0f000a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java @@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInterpreterLoader; +import org.apache.zeppelin.notebook.NotebookEventObserver; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.junit.After; @@ -286,7 +287,7 @@ private Paragraph addParagraphWithTextAndTitle(Note note, String text, String ti } private Note newNote(String name) { - Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null); + Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null, new NotebookEventObserver()); note.setName(name); return note; }