From e23eb351ea2a76ec9657739893ff256d0fb91eca Mon Sep 17 00:00:00 2001 From: willschipp Date: Wed, 22 May 2013 07:34:57 -0400 Subject: [PATCH 1/4] BATCH-2009 - stoppable tasklet --- .../launch/support/SimpleJobOperator.java | 33 ++++++++ .../core/step/item/ChunkOrientedTasklet.java | 22 ++++- .../step/tasklet/CallableTaskletAdapter.java | 5 +- .../tasklet/MethodInvokingTaskletAdapter.java | 6 +- .../core/step/tasklet/StoppableTasklet.java | 20 +++++ .../step/tasklet/SystemCommandTasklet.java | 19 ++++- .../batch/core/step/tasklet/TaskletStep.java | 9 +++ .../support/SimpleJobOperatorTests.java | 80 ++++++++++++++++++- .../step/item/ChunkOrientedTaskletTests.java | 40 ++++++++++ .../step/tasklet/StoppableTaskletTest.java | 26 ++++++ .../SystemCommandTaskletIntegrationTests.java | 18 +++++ 11 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java create mode 100644 spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java index 0735e7f2e7..5098f89a37 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java @@ -16,6 +16,7 @@ package org.springframework.batch.core.launch.support; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -33,6 +34,7 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersIncrementer; import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.configuration.JobRegistry; @@ -40,6 +42,7 @@ import org.springframework.batch.core.converter.DefaultJobParametersConverter; import org.springframework.batch.core.converter.JobParametersConverter; import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.launch.JobExecutionNotRunningException; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobLauncher; @@ -52,6 +55,9 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.batch.core.step.tasklet.StoppableTasklet; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.support.PropertiesConverter; import org.springframework.beans.factory.InitializingBean; import org.springframework.transaction.annotation.Transactional; @@ -71,6 +77,7 @@ * * @author Dave Syer * @author Lucas Ward + * @author Will Schipp * @since 2.0 */ public class SimpleJobOperator implements JobOperator, InitializingBean { @@ -392,6 +399,32 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe } jobExecution.setStatus(BatchStatus.STOPPING); jobRepository.update(jobExecution); + //implementation to support Tasklet.stop() + //TODO manage this is an 'scoped' proxy test + //find the job object + Job job; + try { + job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); + //get the steps for the job + if (job instanceof AbstractJob) { + //retrieve the steps + Collection stepNames = ((AbstractJob)job).getStepNames(); + //go through the step names are retrieve each step + for (String stepName : stepNames) { + Step step = ((AbstractJob)job).getStep(stepName); + //determine type + if (step instanceof TaskletStep) { + //invoke stop --> reflection? + Tasklet tasklet = ((TaskletStep) step).getTasklet(); + if (tasklet instanceof StoppableTasklet) { + ((StoppableTasklet)tasklet).stop(); + }//end if + }//end if + }//end for + }//end if + } catch (NoSuchJobException e) { + logger.error("Couldn't find Job for the execution:" + jobExecution.getJobInstance().getJobName()); + } return true; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java index 1fd2bcf8b7..32fca4dc7e 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java @@ -18,8 +18,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.StoppableTasklet; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; @@ -28,10 +30,11 @@ * handling. * * @author Dave Syer + * @author Will Schipp * * @param input item type */ -public class ChunkOrientedTasklet implements Tasklet { +public class ChunkOrientedTasklet implements StoppableTasklet { private static final String INPUTS_KEY = "INPUTS"; @@ -40,6 +43,11 @@ public class ChunkOrientedTasklet implements Tasklet { private final ChunkProvider chunkProvider; private boolean buffering = true; + + /** + * support for stoppable tasklet + */ + private boolean stopped = false; private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class); @@ -63,6 +71,13 @@ public void setBuffering(boolean buffering) { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + //check for stopped at the beginning of a chunk + if (stopped) { + stopped = false;//reset + contribution.setExitStatus(ExitStatus.STOPPED); + return RepeatStatus.FINISHED; + }//end if + @SuppressWarnings("unchecked") Chunk inputs = (Chunk) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { @@ -90,4 +105,9 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon } + @Override + public void stop() { + stopped = true; + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java index 906af77532..6e41e6b261 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java @@ -30,7 +30,7 @@ * @author Dave Syer * */ -public class CallableTaskletAdapter implements Tasklet, InitializingBean { +public class CallableTaskletAdapter implements StoppableTasklet, InitializingBean { private Callable callable; @@ -62,4 +62,7 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon return callable.call(); } + @Override + public void stop() { } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java index c713f76f65..9b5139d800 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java @@ -31,9 +31,10 @@ * @see AbstractMethodInvokingDelegator * * @author Dave Syer + * @author Will Schipp * */ -public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator implements Tasklet { +public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator implements StoppableTasklet { /** * Delegate execution to the target object and translate the return value to @@ -62,4 +63,7 @@ protected ExitStatus mapResult(Object result) { return ExitStatus.COMPLETED; } + @Override + public void stop() { } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java new file mode 100644 index 0000000000..9918aa037f --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java @@ -0,0 +1,20 @@ +package org.springframework.batch.core.step.tasklet; + +/** + * + * JSR-352 compatible tasklet that provides the 'stop' function. + * The Spring Batch Tasklet is analogous to 'batchlet' in the JSR + * terminology + * + * @author Will Schipp + * + */ +public interface StoppableTasklet extends Tasklet { + + /** + * method to signal a long running/looping tasklet to stop + * + */ + void stop(); + +} diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java index 397ef75786..e6b2e59a59 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java @@ -54,8 +54,9 @@ * still running when tasklet exits (abnormally). * * @author Robert Kasanicky + * @author Will Schipp */ -public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean { +public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean { protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); @@ -77,6 +78,8 @@ public class SystemCommandTasklet extends StepExecutionListenerSupport implement private boolean interruptOnCancel = false; + private boolean stopped = false; + /** * Execute system command and map its exit code to {@link ExitStatus} using * {@link SystemProcessExitCodeMapper}. @@ -99,7 +102,7 @@ public Integer call() throws Exception { taskExecutor.execute(systemCommandTask); while (true) { - Thread.sleep(checkInterval); + Thread.sleep(checkInterval);//moved to the end of the logic if (systemCommandTask.isDone()) { contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); return RepeatStatus.FINISHED; @@ -112,6 +115,13 @@ else if (execution.isTerminateOnly()) { systemCommandTask.cancel(interruptOnCancel); throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); } + else if (stopped) { + stopped = false;//reset + systemCommandTask.cancel(interruptOnCancel); + contribution.setExitStatus(ExitStatus.STOPPED); + return RepeatStatus.FINISHED; + } + } } @@ -208,4 +218,9 @@ public void setInterruptOnCancel(boolean interruptOnCancel) { this.interruptOnCancel = interruptOnCancel; } + @Override + public void stop() { + stopped = true; + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java index 8b98f9fe44..70f56edd5f 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java @@ -71,6 +71,7 @@ * @author Ben Hale * @author Robert Kasanicky * @author Michael Minella + * @author Will Schipp */ @SuppressWarnings("serial") public class TaskletStep extends AbstractStep { @@ -306,6 +307,14 @@ protected void open(ExecutionContext ctx) throws Exception { stream.open(ctx); } + /** + * retrieve the tasklet - helper method for JobOperator + * @return + */ + public Tasklet getTasklet() { + return tasklet; + } + /** * A callback for the transactional work inside a chunk. Also detects * failures in the transaction commit and rollback, only panicking if the diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java index 197153ac9d..f10e668256 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java @@ -15,15 +15,16 @@ */ package org.springframework.batch.core.launch.support; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -36,12 +37,17 @@ import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersIncrementer; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.support.MapJobRegistry; import org.springframework.batch.core.converter.DefaultJobParametersConverter; import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.job.JobSupport; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobLauncher; @@ -52,6 +58,10 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.StoppableTasklet; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.batch.support.PropertiesConverter; /** @@ -349,6 +359,32 @@ public void testStop() throws Exception{ assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); } + @Test + public void testStopTasklet() throws Exception { + JobInstance jobInstance = new JobInstance(123L, job.getName()); + JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); + MockStoppableTasklet tasklet = new MockStoppableTasklet(); + TaskletStep taskletStep = new TaskletStep(); + taskletStep.setTasklet(tasklet); + MockJob job = new MockJob(); + job.taskletStep = taskletStep; + + JobRegistry jobRegistry = mock(JobRegistry.class); + TaskletStep step = mock(TaskletStep.class); + + when(step.getTasklet()).thenReturn(tasklet); + when(step.getName()).thenReturn("test_job.step1"); + when(jobRegistry.getJob(anyString())).thenReturn(job); + when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution); + + jobOperator.setJobRegistry(jobRegistry); + jobExplorer.getJobExecution(111L); + jobRepository.update(jobExecution); + jobOperator.stop(111L); + assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); + assertTrue(tasklet.stopped); + } + @Test public void testAbort() throws Exception { JobInstance jobInstance = new JobInstance(123L, job.getName()); @@ -370,4 +406,42 @@ public void testAbortNonStopping() throws Exception { jobRepository.update(jobExecution); jobOperator.abandon(123L); } + + class MockJob extends AbstractJob { + + private TaskletStep taskletStep; + + @Override + public Step getStep(String stepName) { + return taskletStep; + } + + @Override + public Collection getStepNames() { + return Arrays.asList("test_job.step1"); + } + + @Override + protected void doExecute(JobExecution execution) throws JobExecutionException { + + } + + } + + class MockStoppableTasklet implements StoppableTasklet { + + boolean stopped = Boolean.FALSE; + + @Override + public RepeatStatus execute(StepContribution contribution, + ChunkContext chunkContext) throws Exception { + return null; + } + + @Override + public void stop() { + stopped = Boolean.TRUE; + } + + } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java index 0e5c3e6ca1..a17191b498 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java @@ -26,6 +26,7 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.repeat.RepeatStatus; /** * @author Dave Syer @@ -114,5 +115,44 @@ public void process(StepContribution contribution, Chunk chunk) { // The tasklet does not change the exit code assertEquals(expected, contribution.getExitStatus()); } + + @Test + public void testStopped() throws Exception { + ChunkOrientedTasklet handler = new ChunkOrientedTasklet(new MockChunkProvider(), new ChunkProcessor() { + @Override + public void process(StepContribution contribution, Chunk chunk) { + contribution.incrementWriteCount(1); + } + }); + StepContribution contribution = new StepContribution(new StepExecution("foo", new JobExecution(new JobInstance( + 123L, "job"), new JobParameters()))); + ExitStatus expected = ExitStatus.STOPPED; + while (handler.execute(contribution, context).equals(RepeatStatus.CONTINUABLE)) { + handler.stop();//call a stop after the first one + }//end while + // The tasklet does not change the exit code + assertEquals(expected, contribution.getExitStatus()); + } + + class MockChunkProvider implements ChunkProvider{ + + int max = 3; + int counter = 0; + + @Override + public Chunk provide(StepContribution contribution) throws Exception { + while (counter < max) { + counter++; + Chunk chunk = new Chunk(); + chunk.add("foo"); + System.out.println("here's the chunk " + chunk); + return chunk;//send it + } + return null; + } + @Override + public void postProcess(StepContribution contribution,Chunk chunk) { } + + } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java new file mode 100644 index 0000000000..fe6b805f55 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java @@ -0,0 +1,26 @@ +package org.springframework.batch.core.step.tasklet; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import org.junit.Test; + +/** + * basic mock test + * @author Will Schipp + * + */ +public class StoppableTaskletTest { + + @Test + public void testStop() { + StoppableTasklet stoppableTasklet = mock(StoppableTasklet.class); + try { + stoppableTasklet.stop(); + } + catch (Exception e) { + fail(e.getMessage()); + } + } + +} diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java index dcb3e9aa19..442807fa88 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/SystemCommandTaskletIntegrationTests.java @@ -216,6 +216,24 @@ public void testWorkingDirectory() throws Exception { // no error expected now tasklet.setWorkingDirectory(directory.getCanonicalPath()); } + + /* + * test stopping a tasklet + */ + @Test + public void testStopped() throws Exception { + String command = "sleep 5"; + tasklet.setCommand(command); + tasklet.setTerminationCheckInterval(10); + tasklet.afterPropertiesSet(); + + StepContribution contribution = stepExecution.createStepContribution(); + //send stop + tasklet.stop(); + tasklet.execute(contribution, null); + + assertEquals(contribution.getExitStatus().getExitCode(),ExitStatus.STOPPED.getExitCode()); + } /** * Exit code mapper containing mapping logic expected by the tests. 0 means From 0ebda8ff7793fe2a50b4f50c1fb0dcf908301f56 Mon Sep 17 00:00:00 2001 From: willschipp Date: Wed, 22 May 2013 18:23:41 -0400 Subject: [PATCH 2/4] update to step retrieval --- .../launch/support/SimpleJobOperator.java | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java index 5098f89a37..f47d780c88 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java @@ -16,7 +16,6 @@ package org.springframework.batch.core.launch.support; import java.util.ArrayList; -import java.util.Collection; import java.util.Date; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -400,30 +399,26 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe jobExecution.setStatus(BatchStatus.STOPPING); jobRepository.update(jobExecution); //implementation to support Tasklet.stop() - //TODO manage this is an 'scoped' proxy test - //find the job object - Job job; try { - job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); - //get the steps for the job - if (job instanceof AbstractJob) { - //retrieve the steps - Collection stepNames = ((AbstractJob)job).getStepNames(); - //go through the step names are retrieve each step - for (String stepName : stepNames) { - Step step = ((AbstractJob)job).getStep(stepName); - //determine type - if (step instanceof TaskletStep) { - //invoke stop --> reflection? - Tasklet tasklet = ((TaskletStep) step).getTasklet(); - if (tasklet instanceof StoppableTasklet) { - ((StoppableTasklet)tasklet).stop(); + Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); + if (job instanceof AbstractJob) {//can only process as AbstractJob is the only way to get the step object + //get the current stepExecution + for (StepExecution stepExecution : jobExecution.getStepExecutions()) { + if (stepExecution.getStatus().isRunning()) { + //have the step execution that's running -> need to 'stop' it + Step step = ((AbstractJob)job).getStep(stepExecution.getStepName()); + if (step instanceof TaskletStep) { + Tasklet tasklet = ((TaskletStep)step).getTasklet(); + if (tasklet instanceof StoppableTasklet) { + ((StoppableTasklet)tasklet).stop(); + }//end if }//end if }//end if - }//end for + }//end for }//end if - } catch (NoSuchJobException e) { - logger.error("Couldn't find Job for the execution:" + jobExecution.getJobInstance().getJobName()); + } + catch (NoSuchJobException e) { + //TODO - handle by converting } return true; From cfa15bf4d8dd86824d18c29694868391041a24b5 Mon Sep 17 00:00:00 2001 From: willschipp Date: Mon, 3 Jun 2013 21:09:44 -0400 Subject: [PATCH 3/4] fixes as per comments from MMinella --- .../launch/support/SimpleJobOperator.java | 26 +++++++----- .../core/step/item/ChunkOrientedTasklet.java | 22 +--------- .../step/tasklet/CallableTaskletAdapter.java | 5 +-- .../tasklet/MethodInvokingTaskletAdapter.java | 6 +-- .../core/step/tasklet/StoppableTasklet.java | 4 +- .../support/SimpleJobOperatorTests.java | 19 +-------- .../step/item/ChunkOrientedTaskletTests.java | 40 ------------------- .../step/tasklet/StoppableTaskletTest.java | 26 ------------ 8 files changed, 21 insertions(+), 127 deletions(-) delete mode 100644 spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java index f47d780c88..0738305d3e 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java @@ -41,7 +41,6 @@ import org.springframework.batch.core.converter.DefaultJobParametersConverter; import org.springframework.batch.core.converter.JobParametersConverter; import org.springframework.batch.core.explore.JobExplorer; -import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.launch.JobExecutionNotRunningException; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobLauncher; @@ -54,6 +53,8 @@ import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.batch.core.step.NoSuchStepException; +import org.springframework.batch.core.step.StepLocator; import org.springframework.batch.core.step.tasklet.StoppableTasklet; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.core.step.tasklet.TaskletStep; @@ -401,24 +402,29 @@ public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExe //implementation to support Tasklet.stop() try { Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); - if (job instanceof AbstractJob) {//can only process as AbstractJob is the only way to get the step object + if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object //get the current stepExecution for (StepExecution stepExecution : jobExecution.getStepExecutions()) { if (stepExecution.getStatus().isRunning()) { - //have the step execution that's running -> need to 'stop' it - Step step = ((AbstractJob)job).getStep(stepExecution.getStepName()); - if (step instanceof TaskletStep) { - Tasklet tasklet = ((TaskletStep)step).getTasklet(); - if (tasklet instanceof StoppableTasklet) { - ((StoppableTasklet)tasklet).stop(); + try { + //have the step execution that's running -> need to 'stop' it + Step step = ((StepLocator)job).getStep(stepExecution.getStepName()); + if (step instanceof TaskletStep) { + Tasklet tasklet = ((TaskletStep)step).getTasklet(); + if (tasklet instanceof StoppableTasklet) { + ((StoppableTasklet)tasklet).stop(); + }//end if }//end if - }//end if + } + catch (NoSuchStepException e) { + logger.warn("Step not found",e); + } }//end if }//end for }//end if } catch (NoSuchJobException e) { - //TODO - handle by converting + logger.warn("Cannot find Job object",e); } return true; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java index 32fca4dc7e..1fd2bcf8b7 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java @@ -18,10 +18,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; -import org.springframework.batch.core.step.tasklet.StoppableTasklet; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; @@ -30,11 +28,10 @@ * handling. * * @author Dave Syer - * @author Will Schipp * * @param input item type */ -public class ChunkOrientedTasklet implements StoppableTasklet { +public class ChunkOrientedTasklet implements Tasklet { private static final String INPUTS_KEY = "INPUTS"; @@ -43,11 +40,6 @@ public class ChunkOrientedTasklet implements StoppableTasklet { private final ChunkProvider chunkProvider; private boolean buffering = true; - - /** - * support for stoppable tasklet - */ - private boolean stopped = false; private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class); @@ -71,13 +63,6 @@ public void setBuffering(boolean buffering) { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { - //check for stopped at the beginning of a chunk - if (stopped) { - stopped = false;//reset - contribution.setExitStatus(ExitStatus.STOPPED); - return RepeatStatus.FINISHED; - }//end if - @SuppressWarnings("unchecked") Chunk inputs = (Chunk) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { @@ -105,9 +90,4 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon } - @Override - public void stop() { - stopped = true; - } - } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java index 6e41e6b261..906af77532 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/CallableTaskletAdapter.java @@ -30,7 +30,7 @@ * @author Dave Syer * */ -public class CallableTaskletAdapter implements StoppableTasklet, InitializingBean { +public class CallableTaskletAdapter implements Tasklet, InitializingBean { private Callable callable; @@ -62,7 +62,4 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon return callable.call(); } - @Override - public void stop() { } - } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java index 9b5139d800..c713f76f65 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/MethodInvokingTaskletAdapter.java @@ -31,10 +31,9 @@ * @see AbstractMethodInvokingDelegator * * @author Dave Syer - * @author Will Schipp * */ -public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator implements StoppableTasklet { +public class MethodInvokingTaskletAdapter extends AbstractMethodInvokingDelegator implements Tasklet { /** * Delegate execution to the target object and translate the return value to @@ -63,7 +62,4 @@ protected ExitStatus mapResult(Object result) { return ExitStatus.COMPLETED; } - @Override - public void stop() { } - } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java index 9918aa037f..280b4ecc4b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/StoppableTasklet.java @@ -2,9 +2,7 @@ /** * - * JSR-352 compatible tasklet that provides the 'stop' function. - * The Spring Batch Tasklet is analogous to 'batchlet' in the JSR - * terminology + * Stoppable tasklet * * @author Will Schipp * diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java index f10e668256..35a1e74a48 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java @@ -363,7 +363,7 @@ public void testStop() throws Exception{ public void testStopTasklet() throws Exception { JobInstance jobInstance = new JobInstance(123L, job.getName()); JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); - MockStoppableTasklet tasklet = new MockStoppableTasklet(); + StoppableTasklet tasklet = mock(StoppableTasklet.class); TaskletStep taskletStep = new TaskletStep(); taskletStep.setTasklet(tasklet); MockJob job = new MockJob(); @@ -382,7 +382,6 @@ public void testStopTasklet() throws Exception { jobRepository.update(jobExecution); jobOperator.stop(111L); assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); - assertTrue(tasklet.stopped); } @Test @@ -428,20 +427,4 @@ protected void doExecute(JobExecution execution) throws JobExecutionException { } - class MockStoppableTasklet implements StoppableTasklet { - - boolean stopped = Boolean.FALSE; - - @Override - public RepeatStatus execute(StepContribution contribution, - ChunkContext chunkContext) throws Exception { - return null; - } - - @Override - public void stop() { - stopped = Boolean.TRUE; - } - - } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java index a17191b498..0e5c3e6ca1 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java @@ -26,7 +26,6 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; -import org.springframework.batch.repeat.RepeatStatus; /** * @author Dave Syer @@ -115,44 +114,5 @@ public void process(StepContribution contribution, Chunk chunk) { // The tasklet does not change the exit code assertEquals(expected, contribution.getExitStatus()); } - - @Test - public void testStopped() throws Exception { - ChunkOrientedTasklet handler = new ChunkOrientedTasklet(new MockChunkProvider(), new ChunkProcessor() { - @Override - public void process(StepContribution contribution, Chunk chunk) { - contribution.incrementWriteCount(1); - } - }); - StepContribution contribution = new StepContribution(new StepExecution("foo", new JobExecution(new JobInstance( - 123L, "job"), new JobParameters()))); - ExitStatus expected = ExitStatus.STOPPED; - while (handler.execute(contribution, context).equals(RepeatStatus.CONTINUABLE)) { - handler.stop();//call a stop after the first one - }//end while - // The tasklet does not change the exit code - assertEquals(expected, contribution.getExitStatus()); - } - - class MockChunkProvider implements ChunkProvider{ - - int max = 3; - int counter = 0; - - @Override - public Chunk provide(StepContribution contribution) throws Exception { - while (counter < max) { - counter++; - Chunk chunk = new Chunk(); - chunk.add("foo"); - System.out.println("here's the chunk " + chunk); - return chunk;//send it - } - return null; - } - @Override - public void postProcess(StepContribution contribution,Chunk chunk) { } - - } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java deleted file mode 100644 index fe6b805f55..0000000000 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/StoppableTaskletTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.springframework.batch.core.step.tasklet; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import org.junit.Test; - -/** - * basic mock test - * @author Will Schipp - * - */ -public class StoppableTaskletTest { - - @Test - public void testStop() { - StoppableTasklet stoppableTasklet = mock(StoppableTasklet.class); - try { - stoppableTasklet.stop(); - } - catch (Exception e) { - fail(e.getMessage()); - } - } - -} From f9c8017483a771320e93d275160a100ff313b9ff Mon Sep 17 00:00:00 2001 From: willschipp Date: Thu, 13 Jun 2013 14:15:02 -0500 Subject: [PATCH 4/4] additional clean up based on feedback comments --- .../step/tasklet/SystemCommandTasklet.java | 1 + .../support/SimpleJobOperatorTests.java | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java index e6b2e59a59..f0f33ff003 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/SystemCommandTasklet.java @@ -117,6 +117,7 @@ else if (execution.isTerminateOnly()) { } else if (stopped) { stopped = false;//reset + //invoke cancel systemCommandTask.cancel(interruptOnCancel); contribution.setExitStatus(ExitStatus.STOPPED); return RepeatStatus.FINISHED; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java index 35a1e74a48..e77abe0498 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -384,6 +385,42 @@ public void testStopTasklet() throws Exception { assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); } + @Test + public void testStopTaskletException() throws Exception { + JobInstance jobInstance = new JobInstance(123L, job.getName()); + JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters); + StoppableTasklet tasklet = new StoppableTasklet() { + + @Override + public RepeatStatus execute(StepContribution contribution, + ChunkContext chunkContext) throws Exception { + return null; + } + + @Override + public void stop() { + throw new IllegalStateException(); + }}; + TaskletStep taskletStep = new TaskletStep(); + taskletStep.setTasklet(tasklet); + MockJob job = new MockJob(); + job.taskletStep = taskletStep; + + JobRegistry jobRegistry = mock(JobRegistry.class); + TaskletStep step = mock(TaskletStep.class); + + when(step.getTasklet()).thenReturn(tasklet); + when(step.getName()).thenReturn("test_job.step1"); + when(jobRegistry.getJob(anyString())).thenReturn(job); + when(jobExplorer.getJobExecution(111L)).thenReturn(jobExecution); + + jobOperator.setJobRegistry(jobRegistry); + jobExplorer.getJobExecution(111L); + jobRepository.update(jobExecution); + jobOperator.stop(111L); + assertEquals(BatchStatus.STOPPING, jobExecution.getStatus()); + } + @Test public void testAbort() throws Exception { JobInstance jobInstance = new JobInstance(123L, job.getName()); @@ -406,6 +443,7 @@ public void testAbortNonStopping() throws Exception { jobOperator.abandon(123L); } + class MockJob extends AbstractJob { private TaskletStep taskletStep;