Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ class InterpretJob extends Job {
private String script;
private InterpreterContext context;
private Map<String, Object> infos;
private Object results;

public InterpretJob(
String jobId,
Expand All @@ -417,6 +418,11 @@ public InterpretJob(
this.context = context;
}

@Override
public Object getReturn() {
return results;
}

@Override
public int progress() {
return 0;
Expand Down Expand Up @@ -514,6 +520,11 @@ protected Object jobRun() throws Throwable {
protected boolean jobAbort() {
return false;
}

@Override
public void setResult(Object results) {
this.results = results;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@

/**
* Skeletal implementation of the Job concept.
* - designed for inheritance
* - should be run on a separate thread
* - maintains internal state: it's status
* - supports listeners who are updated on status change
*
* Job class is serialized/deserialized and used server<->client communication
* and saving/loading jobs from disk.
* Changing/adding/deleting non transitive field name need consideration of that.
* - designed for inheritance
* - should be run on a separate thread
* - maintains internal state: it's status
* - supports listeners who are updated on status change
*
* Job class is serialized/deserialized and used server<->client communication
* and saving/loading jobs from disk.
* Changing/adding/deleting non transitive field name need consideration of that.
*/
public abstract class Job {
/**
Expand All @@ -48,15 +47,10 @@ public abstract class Job {
* FINISHED - Job finished run. with success
* ERROR - Job finished run. with error
* ABORT - Job finished by abort
*
*/
public static enum Status {
READY,
PENDING,
RUNNING,
FINISHED,
ERROR,
ABORT;
READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;

public boolean isReady() {
return this == READY;
}
Expand All @@ -70,16 +64,10 @@ public boolean isPending() {
}
}


private String jobName;
String id;

// since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
// see ZEPPELIN-212
Object results;

// For backward compatibility of note.json format after ZEPPELIN-212
Object result;

Date dateCreated;
Date dateStarted;
Date dateFinished;
Expand Down Expand Up @@ -184,7 +172,7 @@ public void run() {
progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs);
progressUpdator.start();
dateStarted = new Date();
results = jobRun();
setResult(jobRun());
this.exception = null;
errorMessage = null;
dateFinished = new Date();
Expand All @@ -193,14 +181,14 @@ public void run() {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
results = e.getMessage();
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} catch (Throwable e) {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
results = e.getMessage();
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} finally {
Expand All @@ -226,13 +214,7 @@ protected void setException(Throwable t) {
errorMessage = getStack(t);
}

public Object getPreviousResultFormat() {
return result;
}

public Object getReturn() {
return results;
}
public abstract Object getReturn();

public String getJobName() {
return jobName;
Expand Down Expand Up @@ -270,7 +252,5 @@ public Date getDateFinished() {
return dateFinished;
}

public void setResult(Object results) {
this.results = results;
}
public abstract void setResult(Object results);
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,17 @@ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOExc

long start = System.currentTimeMillis();
Job jobA = new Job("jobA", null) {
private Object r;

@Override
public Object getReturn() {
return r;
}

@Override
public void setResult(Object results) {
this.r = results;
}

@Override
public int progress() {
Expand Down Expand Up @@ -336,6 +347,18 @@ protected boolean jobAbort() {

Job jobB = new Job("jobB", null) {

private Object r;

@Override
public Object getReturn() {
return r;
}

@Override
public void setResult(Object results) {
this.r = results;
}

@Override
public int progress() {
return 0;
Expand Down Expand Up @@ -403,6 +426,17 @@ public void testRunOrderPreserved() throws InterruptedException {
for (int i = 0; i < concurrency; i++) {
final String jobId = Integer.toString(i);
scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
private Object r;

@Override
public Object getReturn() {
return r;
}

@Override
public void setResult(Object results) {
this.r = results;
}

@Override
public int progress() {
Expand Down Expand Up @@ -483,6 +517,17 @@ public void testRunParallel() throws InterruptedException {
for (int i = 0; i < concurrency; i++) {
final String jobId = Integer.toString(i);
scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
private Object r;

@Override
public Object getReturn() {
return r;
}

@Override
public void setResult(Object results) {
this.r = results;
}

@Override
public int progress() {
Expand Down Expand Up @@ -586,6 +631,17 @@ public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedEx
intpA.open();

Job jobA = new Job("jobA", null) {
private Object r;

@Override
public Object getReturn() {
return r;
}

@Override
public void setResult(Object results) {
this.r = results;
}

@Override
public int progress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public void test() throws Exception {
10);

Job job = new Job("jobId", "jobName", null, 200) {
Object results;
@Override
public Object getReturn() {
return results;
}

@Override
public int progress() {
Expand Down Expand Up @@ -125,6 +130,11 @@ protected Object jobRun() throws Throwable {
protected boolean jobAbort() {
return false;
}

@Override
public void setResult(Object results) {
this.results = results;
}
};
scheduler.submit(job);

Expand Down Expand Up @@ -185,6 +195,7 @@ public void testAbortOnPending() throws Exception {
10);

Job job1 = new Job("jobId1", "jobName1", null, 200) {
Object results;
InterpreterContext context = new InterpreterContext(
"note",
"jobId1",
Expand All @@ -198,6 +209,11 @@ public void testAbortOnPending() throws Exception {
new LocalResourcePool("pool1"),
new LinkedList<InterpreterContextRunner>(), null);

@Override
public Object getReturn() {
return results;
}

@Override
public int progress() {
return 0;
Expand All @@ -221,9 +237,15 @@ protected boolean jobAbort() {
}
return true;
}

@Override
public void setResult(Object results) {
this.results = results;
}
};

Job job2 = new Job("jobId2", "jobName2", null, 200) {
public Object results;
InterpreterContext context = new InterpreterContext(
"note",
"jobId2",
Expand All @@ -237,6 +259,11 @@ protected boolean jobAbort() {
new LocalResourcePool("pool1"),
new LinkedList<InterpreterContextRunner>(), null);

@Override
public Object getReturn() {
return results;
}

@Override
public int progress() {
return 0;
Expand All @@ -260,6 +287,11 @@ protected boolean jobAbort() {
}
return true;
}

@Override
public void setResult(Object results) {
this.results = results;
}
};

job2.setResult("result2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class SleepingJob extends Job{
private int count;

static Logger LOGGER = LoggerFactory.getLogger(SleepingJob.class);
private Object results;


public SleepingJob(String jobName, JobListener listener, int time){
Expand Down Expand Up @@ -61,6 +62,16 @@ public boolean jobAbort() {
return true;
}

@Override
public void setResult(Object results) {
this.results = results;
}

@Override
public Object getReturn() {
return results;
}

@Override
public int progress() {
long p = (System.currentTimeMillis() - start)*100 / time;
Expand Down
Loading