Skip to content

BATCH-2009 - stoppable tasklet #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.configuration.JobRegistry;
Expand All @@ -52,6 +53,11 @@
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.NoSuchStepException;
import org.springframework.batch.core.step.StepLocator;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.support.PropertiesConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -71,6 +77,7 @@
*
* @author Dave Syer
* @author Lucas Ward
* @author Will Schipp
* @since 2.0
*/
public class SimpleJobOperator implements JobOperator, InitializingBean {
Expand Down Expand Up @@ -392,6 +399,33 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe
}
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
//implementation to support Tasklet.stop()
try {
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
//get the current stepExecution
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isRunning()) {
try {
//have the step execution that's running -> need to 'stop' it
Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
if (step instanceof TaskletStep) {
Tasklet tasklet = ((TaskletStep)step).getTasklet();
if (tasklet instanceof StoppableTasklet) {
((StoppableTasklet)tasklet).stop();
}//end if
}//end if
}
catch (NoSuchStepException e) {
logger.warn("Step not found",e);
}
}//end if
}//end for
}//end if
}
catch (NoSuchJobException e) {
logger.warn("Cannot find Job object",e);
}

return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.springframework.batch.core.step.tasklet;

/**
*
* Stoppable tasklet
*
* @author Will Schipp
*
*/
public interface StoppableTasklet extends Tasklet {

/**
* method to signal a long running/looping tasklet to stop
*
*/
void stop();

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@
* still running when tasklet exits (abnormally).
*
* @author Robert Kasanicky
* @author Will Schipp
*/
public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean {
public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean {

protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);

Expand All @@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement

private boolean interruptOnCancel = false;

private boolean stopped = false;

/**
* Execute system command and map its exit code to {@link ExitStatus} using
* {@link SystemProcessExitCodeMapper}.
Expand All @@ -99,7 +102,7 @@ public Integer call() throws Exception {
taskExecutor.execute(systemCommandTask);

while (true) {
Thread.sleep(checkInterval);
Thread.sleep(checkInterval);//moved to the end of the logic
if (systemCommandTask.isDone()) {
contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
return RepeatStatus.FINISHED;
Expand All @@ -112,6 +115,14 @@ else if (execution.isTerminateOnly()) {
systemCommandTask.cancel(interruptOnCancel);
throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
}
else if (stopped) {
stopped = false;//reset
//invoke cancel
systemCommandTask.cancel(interruptOnCancel);
contribution.setExitStatus(ExitStatus.STOPPED);
return RepeatStatus.FINISHED;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. At the very least, we would need to add in the documentation that the underlying command will not be interrupted unless interuptOnCancel has been set to true. Even with that though, we can't guarantee the state of things once that has occurred.


}

}
Expand Down Expand Up @@ -208,4 +219,9 @@ public void setInterruptOnCancel(boolean interruptOnCancel) {
this.interruptOnCancel = interruptOnCancel;
}

@Override
public void stop() {
stopped = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* @author Ben Hale
* @author Robert Kasanicky
* @author Michael Minella
* @author Will Schipp
*/
@SuppressWarnings("serial")
public class TaskletStep extends AbstractStep {
Expand Down Expand Up @@ -306,6 +307,14 @@ protected void open(ExecutionContext ctx) throws Exception {
stream.open(ctx);
}

/**
* retrieve the tasklet - helper method for JobOperator
* @return
*/
public Tasklet getTasklet() {
return tasklet;
}

/**
* A callback for the transactional work inside a chunk. Also detects
* failures in the transaction commit and rollback, only panicking if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
*/
package org.springframework.batch.core.launch.support;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,12 +38,17 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.job.JobSupport;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobLauncher;
Expand All @@ -52,6 +59,10 @@
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.support.PropertiesConverter;

/**
Expand Down Expand Up @@ -349,6 +360,67 @@ public void testStop() throws Exception{
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
}

@Test
public void testStopTasklet() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests the rosey scenario. What about when something goes wrong with stopping a Tasklet (exception being thrown in the StoppableTasklet#stop() method at a minimum).

JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
StoppableTasklet tasklet = mock(StoppableTasklet.class);
TaskletStep taskletStep = new TaskletStep();
taskletStep.setTasklet(tasklet);
MockJob job = new MockJob();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same not as above...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowed test construction of the job to test the retrieval of the tasklet

job.taskletStep = taskletStep;

JobRegistry jobRegistry = mock(JobRegistry.class);
TaskletStep step = mock(TaskletStep.class);

when(step.getTasklet()).thenReturn(tasklet);
when(step.getName()).thenReturn("test_job.step1");
when(jobRegistry.getJob(anyString())).thenReturn(job);
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution);

jobOperator.setJobRegistry(jobRegistry);
jobExplorer.getJobExecution(111L);
jobRepository.update(jobExecution);
jobOperator.stop(111L);
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
}

@Test
public void testStopTaskletException() throws Exception {
JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
StoppableTasklet tasklet = new StoppableTasklet() {

@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
return null;
}

@Override
public void stop() {
throw new IllegalStateException();
}};
TaskletStep taskletStep = new TaskletStep();
taskletStep.setTasklet(tasklet);
MockJob job = new MockJob();
job.taskletStep = taskletStep;

JobRegistry jobRegistry = mock(JobRegistry.class);
TaskletStep step = mock(TaskletStep.class);

when(step.getTasklet()).thenReturn(tasklet);
when(step.getName()).thenReturn("test_job.step1");
when(jobRegistry.getJob(anyString())).thenReturn(job);
when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution);

jobOperator.setJobRegistry(jobRegistry);
jobExplorer.getJobExecution(111L);
jobRepository.update(jobExecution);
jobOperator.stop(111L);
assertEquals(BatchStatus.STOPPING, jobExecution.getStatus());
}

@Test
public void testAbort() throws Exception {
JobInstance jobInstance = new JobInstance(123L, job.getName());
Expand All @@ -370,4 +442,27 @@ public void testAbortNonStopping() throws Exception {
jobRepository.update(jobExecution);
jobOperator.abandon(123L);
}


class MockJob extends AbstractJob {

private TaskletStep taskletStep;

@Override
public Step getStep(String stepName) {
return taskletStep;
}

@Override
public Collection<String> getStepNames() {
return Arrays.asList("test_job.step1");
}

@Override
protected void doExecute(JobExecution execution) throws JobExecutionException {

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,24 @@ public void testWorkingDirectory() throws Exception {
// no error expected now
tasklet.setWorkingDirectory(directory.getCanonicalPath());
}

/*
* test stopping a tasklet
*/
@Test
public void testStopped() throws Exception {
String command = "sleep 5";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple things here:

  1. We need to iron out the changes mentioned above about this tasklet.
  2. We need to be cognisant of Windows users (who don't have a sleep command). There is an open PR to change that based on the OS I still need to address (PR Update SystemCommandTaskletIntegrationTests.java #164)

tasklet.setCommand(command);
tasklet.setTerminationCheckInterval(10);
tasklet.afterPropertiesSet();

StepContribution contribution = stepExecution.createStepContribution();
//send stop
tasklet.stop();
tasklet.execute(contribution, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is stop called before execute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only to support the test in the sequencing


assertEquals(contribution.getExitStatus().getExitCode(),ExitStatus.STOPPED.getExitCode());
}

/**
* Exit code mapper containing mapping logic expected by the tests. 0 means
Expand Down