Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,52 @@ public Response getCronJob(@PathParam("notebookId") String notebookId) throws
}

return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build();
}
}

/**
* Get notebook jobs for job manager
* @param
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@GET
@Path("jobmanager/")
@ZeppelinApi
public Response getJobListforNotebook() throws IOException, IllegalArgumentException {
LOG.info("Get notebook jobs for job manager");

List<Map<String, Object>> notebookJobs = notebook.getJobListforNotebook(false, 0);
Map<String, Object> response = new HashMap<>();

response.put("lastResponseUnixTime", System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actual time gap between System.currentTimeMillis and the result of notebook.getJobListforNotebook. Is it OK? Or I think you can use some information of notebookJobs.

Copy link
Member Author

@cloverhearts cloverhearts Jun 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, exists a gap. But the gap is very small.
In addition, there is a solution for it.
'UnixTimeLastRun' value jobs in the Notebook will display the last update time.
It sends a 'unixTimeLastRun' value to the update request.
You can receive data without a gap of time since the update of the Notebook.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloverhearts If this is intended behaviour and isn't influenced on the right behaviour, I'm OK.

response.put("jobs", notebookJobs);

return new JsonResponse<>(Status.OK, response).build();
}

/**
* Get updated notebook jobs for job manager
* @param
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@GET
@Path("jobmanager/{lastUpdateUnixtime}/")
@ZeppelinApi
public Response getUpdatedJobListforNotebook(
@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime) throws
IOException, IllegalArgumentException {
LOG.info("Get updated notebook jobs lastUpdateTime {}", lastUpdateUnixTime);

List<Map<String, Object>> notebookJobs;
notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime);
Map<String, Object> response = new HashMap<>();

response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);

return new JsonResponse<>(Status.OK, response).build();
}

/**
* Search for a Notes with permissions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@
public class NotebookServer extends WebSocketServlet implements
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
RemoteInterpreterProcessListener {
/**
* Job manager service type
*/
protected enum JOB_MANAGER_SERVICE {
JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
private String serviceTypeKey;
JOB_MANAGER_SERVICE(String serviceType) {
this.serviceTypeKey = serviceType;
}
String getKey() {
return this.serviceTypeKey;
}
}

private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
Expand Down Expand Up @@ -203,6 +217,12 @@ public void onMessage(NotebookSocket conn, String msg) {
case CHECKPOINT_NOTEBOOK:
checkpointNotebook(conn, notebook, messagereceived);
break;
case LIST_NOTEBOOK_JOBS:
unicastNotebookJobInfo(conn);
break;
case LIST_UPDATE_NOTEBOOK_JOBS:
unicastUpdateNotebookJobInfo(conn, messagereceived);
break;
default:
break;
}
Expand Down Expand Up @@ -350,6 +370,34 @@ private void unicast(Message m, NotebookSocket conn) {
}
}

public void unicastNotebookJobInfo(NotebookSocket conn) throws IOException {

List<Map<String, Object>> notebookJobs = notebook().getJobListforNotebook(false, 0);
Map<String, Object> response = new HashMap<>();

response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);

conn.send(serializeMessage(new Message(OP.LIST_NOTEBOOK_JOBS)
.put("notebookJobs", response)));
}

public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage)
throws IOException {
double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime");
long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue();

List<Map<String, Object>> notebookJobs;
notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime);

Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);

conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS)
.put("notebookRunningJobs", response)));
}

public List<Map<String, String>> generateNotebooksInfo(boolean needsReload) {
Notebook notebook = notebook();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,138 @@ public void setJobListenerFactory(JobListenerFactory jobListenerFactory) {
this.jobListenerFactory = jobListenerFactory;
}

private Map<String, Object> getParagraphForJobManagerItem(Paragraph paragraph) {
Map<String, Object> paragraphItem = new HashMap<>();

// set paragraph id
paragraphItem.put("id", paragraph.getId());

// set paragraph name
String paragraphName = paragraph.getTitle();
if (paragraphName != null) {
paragraphItem.put("name", paragraphName);
} else {
paragraphItem.put("name", paragraph.getId());
}

// set status for paragraph.
paragraphItem.put("status", paragraph.getStatus().toString());

return paragraphItem;
}

private long getUnixTimeLastRunParagraph(Paragraph paragraph) {

Date lastRunningDate = null;
long lastRunningUnixTime = 0;

Date paragaraphDate = paragraph.getDateStarted();
// diff started time <-> finishied time
if (paragaraphDate == null) {
paragaraphDate = paragraph.getDateFinished();
} else {
if (paragraph.getDateFinished() != null &&
paragraph.getDateFinished().after(paragaraphDate)) {
paragaraphDate = paragraph.getDateFinished();
}
}

// finished time and started time is not exists.
if (paragaraphDate == null) {
paragaraphDate = paragraph.getDateCreated();
}

// set last update unixtime(ms).
lastRunningDate = paragaraphDate;

lastRunningUnixTime = lastRunningDate.getTime();

return lastRunningUnixTime;
}

public List<Map<String, Object>> getJobListforNotebook(boolean needsReload,
long lastUpdateServerUnixTime) {
final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";

if (needsReload) {
try {
reloadAllNotes();
} catch (IOException e) {
logger.error("Fail to reload notes from repository");
}
}

List<Note> notes = getAllNotes();
List<Map<String, Object>> notesInfo = new LinkedList<>();
for (Note note : notes) {
boolean isNotebookRunning = false;
boolean isUpdateNotebook = false;
long lastRunningUnixTime = 0;
Map<String, Object> info = new HashMap<>();

// set notebook ID
info.put("notebookId", note.id());

// set notebook Name
String notebookName = note.getName();
if (notebookName != null) {
info.put("notebookName", note.getName());
} else {
info.put("notebookName", "Note " + note.id());
}

// set notebook type ( cron or normal )
if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true &&
!note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
info.put("notebookType", "cron");
}
else {
info.put("notebookType", "normal");
}

// set paragraphs
List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
for (Paragraph paragraph : note.getParagraphs()) {
// check paragraph's status.
if (paragraph.getStatus().isRunning() == true) {
isNotebookRunning = true;
isUpdateNotebook = true;
}

// get data for the job manager.
Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);

// is update notebook for last server update time.
if (lastRunningUnixTime > lastUpdateServerUnixTime) {
paragraphsInfo.add(paragraphItem);
isUpdateNotebook = true;
}
}

// set interpreter bind type
String interpreterGroupName = null;
if (note.getNoteReplLoader().getInterpreterSettings() != null &&
note.getNoteReplLoader().getInterpreterSettings().size() >= 1) {
interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup();
}

// not update and not running -> pass
if (isUpdateNotebook == false && isNotebookRunning == false) {
continue;
}

// notebook json object root information.
info.put("interpreter", interpreterGroupName);
info.put("isRunningJob", isNotebookRunning);
info.put("unixTimeLastRun", lastRunningUnixTime);
info.put("paragraphs", paragraphsInfo);
notesInfo.add(info);
}

return notesInfo;
}

/**
* Cron task for the note.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ public static enum OP {
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
// @param settings serialized Map<String, String> object

CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository
// @param noteId
// @param checkpointName

CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository
// @param noteId
// @param checkpointName
LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations
LIST_UPDATE_NOTEBOOK_JOBS // [c-s] get job management informations for until unixtime
// @param unixTime
}

public OP op;
Expand Down