Skip to content

Commit b094db6

Browse files
author
robokaso
committed
IN PROGRESS - BATCH-929: Deferrable Constraints cause unrecoverable errors
allow commit failure to cause execution context rollback instead of step failure with UNKNOWN status
1 parent edff656 commit b094db6

File tree

5 files changed

+147
-20
lines changed

5 files changed

+147
-20
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/JobRepository.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@
3939
public interface JobRepository {
4040

4141
/**
42-
* Check if an instance of this job already exists with the parameters provided.
42+
* Check if an instance of this job already exists with the parameters
43+
* provided.
4344
*
4445
* @param jobName the name of the job
4546
* @param jobParameters the parameters to match
46-
* @return true if a {@link JobInstance} already exists for this job name and job parameters
47+
* @return true if a {@link JobInstance} already exists for this job name
48+
* and job parameters
4749
*/
4850
boolean isJobInstanceExists(String jobName, JobParameters jobParameters);
4951

@@ -68,8 +70,8 @@ public interface JobRepository {
6870
* found and was already completed successfully.
6971
*
7072
*/
71-
JobExecution createJobExecution(String jobName, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
72-
JobRestartException, JobInstanceAlreadyCompleteException;
73+
JobExecution createJobExecution(String jobName, JobParameters jobParameters)
74+
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;
7375

7476
/**
7577
* Update the {@link JobExecution}.
@@ -112,6 +114,22 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters) thr
112114
*/
113115
void updateExecutionContext(StepExecution stepExecution);
114116

117+
/**
118+
* Load the {@link ExecutionContext} of the given {@link StepExecution}.
119+
*
120+
* @param stepExecution the {@link StepExecution} containing the
121+
* {@link ExecutionContext}.
122+
*/
123+
ExecutionContext getExecutionContext(StepExecution stepExecution);
124+
125+
/**
126+
* Load the {@link ExecutionContext} of the given {@link JobExecution}.
127+
*
128+
* @param jobExecution the {@link JobExecution} containing the
129+
* {@link ExecutionContext}.
130+
*/
131+
ExecutionContext getExecutionContext(JobExecution jobExecution);
132+
115133
/**
116134
* @param stepName the name of the step execution that might have run.
117135
* @return the last execution of step for the given job instance.
@@ -125,7 +143,7 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters) thr
125143
int getStepExecutionCount(JobInstance jobInstance, String stepName);
126144

127145
/**
128-
* @param jobName the name of the job that might have run
146+
* @param jobName the name of the job that might have run
129147
* @param jobParameters parameters identifying the {@link JobInstance}
130148
* @return the last execution of job if exists, null otherwise
131149
*/

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,12 @@ public JobExecution getLastJobExecution(String jobName, JobParameters jobParamet
336336

337337
}
338338

339+
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
340+
return ecDao.getExecutionContext(stepExecution);
341+
}
342+
343+
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
344+
return ecDao.getExecutionContext(jobExecution);
345+
}
346+
339347
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.commons.logging.Log;
2121
import org.apache.commons.logging.LogFactory;
2222
import org.springframework.batch.core.BatchStatus;
23+
import org.springframework.batch.core.JobExecution;
2324
import org.springframework.batch.core.JobInterruptedException;
2425
import org.springframework.batch.core.StepContribution;
2526
import org.springframework.batch.core.StepExecution;
@@ -39,6 +40,7 @@
3940
import org.springframework.batch.repeat.RepeatOperations;
4041
import org.springframework.batch.repeat.RepeatStatus;
4142
import org.springframework.batch.repeat.support.RepeatTemplate;
43+
import org.springframework.batch.support.Classifier;
4244
import org.springframework.transaction.PlatformTransactionManager;
4345
import org.springframework.transaction.TransactionStatus;
4446
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
@@ -88,6 +90,14 @@ public boolean rollbackOn(Throwable ex) {
8890

8991
private Semaphore semaphore = new Semaphore(1);
9092

93+
private Classifier<Exception, Boolean> nonFatalCommitExceptions = new Classifier<Exception, Boolean>() {
94+
95+
public Boolean classify(Exception classifiable) {
96+
return false;
97+
}
98+
99+
};
100+
91101
/**
92102
* Default constructor.
93103
*/
@@ -102,6 +112,14 @@ public TaskletStep(String name) {
102112
super(name);
103113
}
104114

115+
/**
116+
* @param nonFatalCommitExceptions classifies whether commit exception is
117+
* fatal or not.
118+
*/
119+
public void setNonFatalCommitExceptions(Classifier<Exception, Boolean> nonFatalCommitExceptions) {
120+
this.nonFatalCommitExceptions = nonFatalCommitExceptions;
121+
}
122+
105123
/**
106124
* Public setter for the {@link PlatformTransactionManager}.
107125
*
@@ -278,10 +296,18 @@ public RepeatStatus doInStepContext(RepeatContext repeatContext, StepContext ste
278296
transactionManager.commit(transaction);
279297
}
280298
catch (Exception e) {
281-
fatalException.setException(e);
282-
stepExecution.setStatus(BatchStatus.UNKNOWN);
283-
logger.error("Fatal error detected during commit.");
284-
throw new FatalException("Fatal error detected during commit", e);
299+
if (nonFatalCommitExceptions.classify(e)) {
300+
stepExecution.setExecutionContext(getJobRepository().getExecutionContext(stepExecution));
301+
JobExecution jobExecution = stepExecution.getJobExecution();
302+
jobExecution.setExecutionContext(getJobRepository().getExecutionContext(jobExecution));
303+
throw new CommitException("non-fatal commit failure", e);
304+
}
305+
else {
306+
fatalException.setException(e);
307+
stepExecution.setStatus(BatchStatus.UNKNOWN);
308+
logger.error("Fatal error detected during commit.");
309+
throw new FatalException("Fatal error detected during commit", e);
310+
}
285311
}
286312

287313
try {
@@ -300,7 +326,10 @@ public RepeatStatus doInStepContext(RepeatContext repeatContext, StepContext ste
300326
throw e;
301327
}
302328
catch (Exception e) {
303-
processRollback(stepExecution, fatalException, transaction);
329+
// if commit failed, calling rollback on tx manager would cause exception
330+
if (!(e instanceof CommitException)) {
331+
processRollback(stepExecution, fatalException, transaction);
332+
}
304333
throw e;
305334
}
306335
finally {
@@ -376,6 +405,15 @@ public Exception getException() {
376405

377406
}
378407

408+
/**
409+
* Signals non-fatal commit failure.
410+
*/
411+
private static class CommitException extends RuntimeException {
412+
public CommitException(String msg, Throwable cause) {
413+
super(msg, cause);
414+
}
415+
}
416+
379417
protected void close(ExecutionContext ctx) throws Exception {
380418
stream.close(ctx);
381419
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/JobRepositorySupport.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.springframework.batch.core.JobParameters;
2121
import org.springframework.batch.core.StepExecution;
2222
import org.springframework.batch.core.repository.JobRepository;
23+
import org.springframework.batch.item.ExecutionContext;
2324

2425
/**
2526
* @author Dave Syer
@@ -82,4 +83,12 @@ public JobExecution getLastJobExecution(String jobName, JobParameters jobParamet
8283
return null;
8384
}
8485

86+
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
87+
return null;
88+
}
89+
90+
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
91+
return null;
92+
}
93+
8594
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.springframework.batch.core.ExitStatus;
1212
import org.springframework.batch.core.JobExecution;
1313
import org.springframework.batch.core.JobInstance;
14-
import org.springframework.batch.core.JobInterruptedException;
1514
import org.springframework.batch.core.JobParameters;
1615
import org.springframework.batch.core.StepContribution;
1716
import org.springframework.batch.core.StepExecution;
@@ -28,6 +27,7 @@
2827
import org.springframework.batch.item.ItemStreamException;
2928
import org.springframework.batch.item.ItemStreamSupport;
3029
import org.springframework.batch.repeat.RepeatStatus;
30+
import org.springframework.batch.support.Classifier;
3131
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
3232
import org.springframework.core.AttributeAccessor;
3333
import org.springframework.transaction.TransactionException;
@@ -41,15 +41,13 @@
4141
*/
4242
public class TaskletStepExceptionTests {
4343

44-
TaskletStep taskletStep;
44+
private TaskletStep taskletStep;
4545

46-
StepExecution stepExecution;
46+
private StepExecution stepExecution;
4747

48-
UpdateCountingJobRepository jobRepository;
48+
private UpdateCountingJobRepository jobRepository;
4949

50-
static RuntimeException taskletException = new RuntimeException();
51-
52-
static JobInterruptedException interruptedException = new JobInterruptedException("");
50+
private static RuntimeException taskletException = new RuntimeException();
5351

5452
@Before
5553
public void init() {
@@ -134,7 +132,7 @@ public RepeatStatus execute(StepContribution contribution, AttributeAccessor att
134132
}
135133

136134
@Test
137-
/**
135+
/*
138136
* Exception in afterStep is ignored (only logged).
139137
*/
140138
public void testAfterStepFAilure() throws Exception {
@@ -196,12 +194,61 @@ public RepeatStatus execute(StepContribution contribution, AttributeAccessor att
196194
assertEquals(exception, e.getCause());
197195
}
198196

197+
/**
198+
* If commit exception isn't fatal step shouldn't complete with UNKNOWN
199+
* status and execution context should be rolled back.
200+
*/
201+
@Test
202+
public void testSkippableCommitError() throws Exception {
203+
204+
class TestItemStream extends ItemStreamSupport {
205+
private boolean called = false;
206+
207+
@Override
208+
public void update(ExecutionContext executionContext) throws ItemStreamException {
209+
executionContext.put("key", "value");
210+
called = true;
211+
}
212+
213+
}
214+
final TestItemStream stream = new TestItemStream();
215+
taskletStep.registerStream(stream);
216+
217+
final RuntimeException commitException = new RuntimeException();
218+
taskletStep.setNonFatalCommitExceptions(new Classifier<Exception, Boolean>() {
219+
220+
public Boolean classify(Exception classifiable) {
221+
return true;
222+
}
223+
});
224+
taskletStep.setTransactionManager(new ResourcelessTransactionManager() {
225+
@Override
226+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
227+
throw commitException;
228+
}
229+
});
230+
231+
taskletStep.setTasklet(new Tasklet() {
232+
233+
public RepeatStatus execute(StepContribution contribution, AttributeAccessor attributes) throws Exception {
234+
return RepeatStatus.FINISHED;
235+
}
236+
237+
});
238+
239+
taskletStep.execute(stepExecution);
240+
assertEquals("step won't refuse to restart", FAILED, stepExecution.getStatus());
241+
assertTrue("execution context modified", stream.called);
242+
assertTrue("execution context rolled back", stepExecution.getExecutionContext().isEmpty());
243+
}
244+
199245
@Test
200246
public void testUpdateError() throws Exception {
201247

202248
final RuntimeException exception = new RuntimeException();
203249
taskletStep.setJobRepository(new UpdateCountingJobRepository() {
204250
boolean firstCall = true;
251+
205252
@Override
206253
public void update(StepExecution arg0) {
207254
if (firstCall) {
@@ -211,7 +258,7 @@ public void update(StepExecution arg0) {
211258
throw exception;
212259
}
213260
});
214-
261+
215262
taskletStep.execute(stepExecution);
216263
assertEquals(UNKNOWN, stepExecution.getStatus());
217264
assertTrue(stepExecution.getFailureExceptions().contains(taskletException));
@@ -272,9 +319,16 @@ public int getUpdateCount() {
272319
}
273320

274321
public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
275-
// TODO Auto-generated method stub
276322
return null;
277323
}
324+
325+
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
326+
return new ExecutionContext();
327+
}
328+
329+
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
330+
return new ExecutionContext();
331+
}
278332
}
279333

280334
}

0 commit comments

Comments
 (0)