Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -48,8 +49,11 @@
import org.junit.Before;
import org.junit.Test;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotebookTest implements JobListenerFactory{
private static final Logger logger = LoggerFactory.getLogger(NotebookTest.class);

private File tmpDir;
private ZeppelinConfiguration conf;
Expand Down Expand Up @@ -228,7 +232,7 @@ public void testSchedule() throws InterruptedException, IOException{
note.setConfig(config);
notebook.refreshCron(note.id());
Thread.sleep(1*1000);

// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
Expand Down Expand Up @@ -319,46 +323,35 @@ public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedExc
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());

Paragraph p1 = note.addParagraph();
p1.setText("p1");
Paragraph p2 = note.addParagraph();
p2.setText("p2");
Paragraph p3 = note.addParagraph();
p3.setText("p3");
Paragraph p4 = note.addParagraph();
p4.setText("p4");

/* all jobs are ready to run */
assertEquals(Job.Status.READY, p1.getStatus());
assertEquals(Job.Status.READY, p2.getStatus());
assertEquals(Job.Status.READY, p3.getStatus());
assertEquals(Job.Status.READY, p4.getStatus());

/* run all */
note.runAll();
ArrayList<Paragraph> paragraphs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Paragraph tmp = note.addParagraph();
tmp.setText("p" + tmp.getId());
paragraphs.add(tmp);
}

/* all are pending in the beginning (first one possibly started)*/
assertTrue(p1.getStatus() == Job.Status.PENDING || p1.getStatus() == Job.Status.RUNNING);
assertEquals(Job.Status.PENDING, p2.getStatus());
assertEquals(Job.Status.PENDING, p3.getStatus());
assertEquals(Job.Status.PENDING, p4.getStatus());
for (Paragraph p : paragraphs) {
assertEquals(Job.Status.READY, p.getStatus());
}

/* wait till first job is terminated and second starts running */
while(p1.isTerminated() == false || (p2.getStatus() == Job.Status.PENDING)) Thread.yield();
note.runAll();

assertEquals(Job.Status.FINISHED, p1.getStatus());
assertEquals(Job.Status.RUNNING, p2.getStatus());
assertEquals(Job.Status.PENDING, p3.getStatus());
assertEquals(Job.Status.PENDING, p4.getStatus());
while (paragraphs.get(0).getStatus() != Status.FINISHED) Thread.yield();

/* restart interpreter */
factory.restart(note.getNoteReplLoader().getInterpreterSettings().get(0).id());

/* pending and running jobs have been aborted */
assertEquals(Job.Status.FINISHED, p1.getStatus());
assertEquals(Job.Status.ABORT, p2.getStatus());
assertEquals(Job.Status.ABORT, p3.getStatus());
assertEquals(Job.Status.ABORT, p4.getStatus());
boolean isAborted = false;
for (Paragraph p : paragraphs) {
logger.debug(p.getStatus().name());
if (isAborted) {
assertEquals(Job.Status.ABORT, p.getStatus());
}
if (p.getStatus() == Status.ABORT) {
isAborted = true;
}
}

assertTrue(isAborted);
}

private void delete(File file){
Expand All @@ -373,7 +366,7 @@ else if(file.isDirectory()){
file.delete();
}
}

@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){
Expand Down