diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 9ae74abd6f9..4be6da5b161 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -125,6 +125,10 @@ public Job(String jobId, String jobName, JobListener listener, long progressUpda setStatus(Status.READY); } + public void setId(String id) { + this.id = id; + } + public String getId() { return id; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 3aa51e18773..5a34ba4982c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -116,7 +116,7 @@ String getKey() { * is going on. */ final Queue watcherSockets = Queues.newConcurrentLinkedQueue(); - + private Notebook notebook() { return ZeppelinServer.notebook; } @@ -193,7 +193,7 @@ public void onMessage(NotebookSocket conn, String msg) { if (StringUtils.isEmpty(conn.getUser())) { addUserConnection(messagereceived.principal, conn); } - AuthenticationInfo subject = + AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal, messagereceived.ticket); /** Lets be elegant here */ @@ -258,6 +258,9 @@ public void onMessage(NotebookSocket conn, String msg) { case FOLDER_RENAME: renameFolder(conn, userAndRoles, notebook, messagereceived); break; + case UPDATE_PERSONALIZED_MODE: + updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); + break; case COMPLETION: completion(conn, userAndRoles, notebook, messagereceived); break; @@ -582,7 +585,21 @@ public void broadcastInterpreterBindings(String noteId, List settingList) { } public void broadcastParagraph(Note note, Paragraph p) { - broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p)); + if (note.isPersonalizedMode()) { + broadcastParagraphs(p.getUserParagraphMap(), p); + } else { + broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p)); + } + } + + public void broadcastParagraphs(Map userParagraphMap, + Paragraph defaultParagraph) { + if (null != userParagraphMap) { + for (String user : userParagraphMap.keySet()) { + multicastToUser(user, + new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); + } + } } private void broadcastNewParagraph(Note note, Paragraph para) { @@ -639,7 +656,7 @@ private void broadcastNoteListExcept(List> notesInfo, multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo)); } } - + void permissionError(NotebookSocket conn, String op, String userName, Set userAndRoles, @@ -677,6 +694,10 @@ private void sendNote(NotebookSocket conn, HashSet userAndRoles, Noteboo return; } addConnectionToNote(note.getId(), conn); + + if (note.isPersonalizedMode()) { + note = note.getUserNote(user); + } conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); sendAllAngularObjects(note, user, conn); } else { @@ -750,6 +771,32 @@ private void updateNote(NotebookSocket conn, HashSet userAndRoles, } } + private void updatePersonalizedMode(NotebookSocket conn, HashSet userAndRoles, + Notebook notebook, Message fromMessage) throws SchedulerException, IOException { + String noteId = (String) fromMessage.get("id"); + String personalized = (String) fromMessage.get("personalized"); + boolean isPersonalized = personalized.equals("true") ? true : false; + + if (noteId == null) { + return; + } + + NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); + if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { + permissionError(conn, "persoanlized ", fromMessage.principal, + userAndRoles, notebookAuthorization.getOwners(noteId)); + return; + } + + Note note = notebook.getNote(noteId); + if (note != null) { + note.setPersonalizedMode(isPersonalized); + AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); + note.persist(subject); + broadcastNote(note); + } + } + private void renameNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws SchedulerException, IOException { @@ -895,34 +942,44 @@ private void removeNote(NotebookSocket conn, HashSet userAndRoles, broadcastNoteList(subject, userAndRoles); } - private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - Map params = (Map) fromMessage - .get("params"); - Map config = (Map) fromMessage - .get("config"); + Map params = (Map) fromMessage.get("params"); + Map config = (Map) fromMessage.get("config"); String noteId = getOpenNoteId(conn); final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } Paragraph p = note.getParagraph(paragraphId); + + if (note.isPersonalizedMode()) { + p = p.getUserParagraphMap().get(subject.getUser()); + } + p.settings.setParams(params); p.setConfig(config); p.setTitle((String) fromMessage.get("title")); p.setText((String) fromMessage.get("paragraph")); note.persist(subject); - broadcastParagraph(note, p);; + + if (note.isPersonalizedMode()) { + Map userParagraphMap = + note.getParagraph(paragraphId).getUserParagraphMap(); + broadcastParagraphs(userParagraphMap, p); + } else { + broadcastParagraph(note, p); + } } private void cloneNote(NotebookSocket conn, HashSet userAndRoles, @@ -2003,7 +2060,7 @@ private void getEditorSetting(NotebookSocket conn, Message fromMessage) return; } - private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject) + private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject) throws IOException { List availableSettings = notebook().getInterpreterFactory().get(); conn.send(serializeMessage(new Message(OP.INTERPRETER_SETTINGS) @@ -2016,7 +2073,7 @@ public void onMetaInfosReceived(String settingId, Map metaInfos) .get(settingId); interpreterSetting.setInfos(metaInfos); } - + private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived) throws IOException { if (!isSessionAllowedToSwitchToWatcher(conn)) { @@ -2030,19 +2087,19 @@ private void switchConnectionToWatcher(NotebookSocket conn, Message messagerecei return; } watcherSockets.add(conn); - + // remove this connection from regular zeppelin ws usage. removeConnectionFromAllNote(conn); connectedSockets.remove(conn); removeUserConnection(conn.getUser(), conn); } - + private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) { String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER); return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey.equals(WatcherSecurityKey.getKey())); } - + private void broadcastToWatchers(String noteId, String subject, Message message) { synchronized (watcherSockets) { if (watcherSockets.isEmpty()) { diff --git a/zeppelin-web/src/app/notebook/notebook-actionBar.html b/zeppelin-web/src/app/notebook/notebook-actionBar.html index aea18a64e9c..88e47b92ad8 100644 --- a/zeppelin-web/src/app/notebook/notebook-actionBar.html +++ b/zeppelin-web/src/app/notebook/notebook-actionBar.html @@ -66,6 +66,23 @@

tooltip-placement="bottom" tooltip="Export this note"> + + + diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js index 8bec1aa2894..fa86abad6b5 100644 --- a/zeppelin-web/src/app/notebook/notebook.controller.js +++ b/zeppelin-web/src/app/notebook/notebook.controller.js @@ -614,6 +614,7 @@ minimumInputLength: 3 }; + $scope.setIamOwner(); angular.element('#selectOwners').select2(selectJson); angular.element('#selectReaders').select2(selectJson); angular.element('#selectWriters').select2(selectJson); @@ -752,6 +753,39 @@ } }; + $scope.setIamOwner = function() { + if ($scope.permissions.owners.length > 0 && + _.indexOf($scope.permissions.owners, $rootScope.ticket.principal) < 0) { + $scope.isOwner = false; + return false; + } + $scope.isOwner = true; + return true; + }; + + $scope.toggleNotePersonalizedMode = function() { + var personalizedMode = $scope.note.config.personalizedMode; + if ($scope.isOwner) { + BootstrapDialog.confirm({ + closable: true, + title: 'Setting the result display', + message: function(dialog) { + var modeText = $scope.note.config.personalizedMode === 'true' ? 'collaborate' : 'personalize'; + return 'Do you want to ' + modeText + ' your analysis?'; + }, + callback: function(result) { + if (result) { + if ($scope.note.config.personalizedMode === undefined) { + $scope.note.config.personalizedMode = 'false'; + } + $scope.note.config.personalizedMode = personalizedMode === 'true' ? 'false' : 'true'; + websocketMsgSrv.updatePersonalizedMode($scope.note.id, $scope.note.config.personalizedMode); + } + } + }); + } + }; + var isSettingDirty = function() { if (angular.equals($scope.interpreterBindings, $scope.interpreterBindingsOrig)) { return false; @@ -896,10 +930,16 @@ if ($scope.note === null) { $scope.note = note; + } else { + $scope.note.config.personalizedMode = note.config.personalizedMode; } initializeLookAndFeel(); //open interpreter binding setting when there're none selected getInterpreterBindings(); + getPermissions(); + var isPersonalized = $scope.note.config.personalizedMode; + isPersonalized = isPersonalized === undefined ? 'false' : isPersonalized; + $scope.note.config.personalizedMode = isPersonalized; }); $scope.$on('$destroy', function() { diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 9ef943eaedc..b764d86e01d 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -969,7 +969,7 @@ $scope.dirtyText = undefined; $scope.originalText = angular.copy(data.paragraph.text); } else { // if there're local update, keep it. - $scope.paragraph.text = $scope.dirtyText; + $scope.paragraph.text = data.paragraph.text; } } else { $scope.paragraph.text = data.paragraph.text; diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 98193c094fa..29479e879fa 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -59,6 +59,10 @@ websocketEvents.sendNewEvent({op: 'NOTE_UPDATE', data: {id: noteId, name: noteName, config: noteConfig}}); }, + updatePersonalizedMode: function(noteId, modeValue) { + websocketEvents.sendNewEvent({op: 'UPDATE_PERSONALIZED_MODE', data: {id: noteId, personalized: modeValue}}); + }, + renameNote: function(noteId, noteName) { websocketEvents.sendNewEvent({op: 'NOTE_RENAME', data: {id: noteId, name: noteName}}); }, diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 2201b67a3c5..dbf3e1b5b10 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -121,6 +121,21 @@ private String getDefaultInterpreterName() { return null != setting ? setting.getName() : StringUtils.EMPTY; } + public boolean isPersonalizedMode() { + Object v = getConfig().get("personalizedMode"); + return null != v && "true".equals(v); + } + + public void setPersonalizedMode(Boolean value) { + String valueString = StringUtils.EMPTY; + if (value) { + valueString = "true"; + } else { + valueString = "false"; + } + getConfig().put("personalizedMode", valueString); + } + public String getId() { return id; } @@ -666,6 +681,31 @@ void unpersist(AuthenticationInfo subject) throws IOException { } + /** + * Return new note for specific user. this inserts and replaces user paragraph which doesn't + * exists in original paragraph + * + * @param user specific user + * @return new Note for the user + */ + public Note getUserNote(String user) { + Note newNote = new Note(); + newNote.id = getId(); + newNote.config = getConfig(); + newNote.angularObjects = getAngularObjects(); + + Paragraph newParagraph; + for (Paragraph p : paragraphs) { + newParagraph = p.getUserParagraph(user); + if (null == newParagraph) { + newParagraph = p.cloneParagraphForUser(user); + } + newNote.paragraphs.add(newParagraph); + } + + return newNote; + } + private void startDelayedPersistTimer(int maxDelaySec, final AuthenticationInfo subject) { synchronized (this) { if (delayedPersist != null) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 114babfd008..7e72564331d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook; +import com.google.common.collect.Maps; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.display.AngularObject; @@ -55,13 +56,14 @@ public class Paragraph extends Job implements Serializable, Cloneable { private transient InterpreterFactory factory; private transient Note note; private transient AuthenticationInfo authenticationInfo; + private transient Map userParagraphMap = Maps.newHashMap(); // personalized String title; String text; String user; Date dateUpdated; private Map config; // paragraph configs like isOpen, colWidth, etc - public final GUI settings; // form and parameter settings + public GUI settings; // form and parameter settings /** * Applicaiton states in this paragraph @@ -106,6 +108,30 @@ private static String generateId() { + new Random(System.currentTimeMillis()).nextInt(); } + public Map getUserParagraphMap() { + return userParagraphMap; + } + + public Paragraph getUserParagraph(String user) { + return userParagraphMap.get(user); + } + + public Paragraph cloneParagraphForUser(String user) { + Paragraph p = new Paragraph(); + p.settings.setParams(Maps.newHashMap(settings.getParams())); + p.settings.setForms(Maps.newHashMap(settings.getForms())); + p.setConfig(Maps.newHashMap(config)); + p.setTitle(getTitle()); + p.setText(getText()); + p.setResult(getReturn()); + p.setStatus(getStatus()); + p.setId(getId()); + + userParagraphMap.put(user, p); + + return p; + } + public String getUser() { return user; } @@ -347,7 +373,20 @@ protected Object jobRun() throws Throwable { context.out.flush(); List resultMessages = context.out.toInterpreterResultMessage(); resultMessages.addAll(ret.message()); - return new InterpreterResult(ret.code(), resultMessages); + + for (Paragraph p : userParagraphMap.values()) { + p.setText(getText()); + } + + InterpreterResult res = new InterpreterResult(ret.code(), resultMessages); + + Paragraph p = userParagraphMap.get(getUser()); + if (null != p) { + p.setResult(res); + p.settings.setParams(settings.getParams()); + } + + return res; } finally { InterpreterContext.remove(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index 362ddcd3ee9..6f36643e900 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -55,6 +55,9 @@ public static enum OP { NOTE_RENAME, + UPDATE_PERSONALIZED_MODE, // [c-s] update personalized mode (boolean) + // @param note id and boolean personalized mode value + FOLDER_RENAME, RUN_PARAGRAPH, // [c-s] run paragraph