Skip to content

Commit 01b0324

Browse files
committed
Apply optimistic locking for MongoDB repository
Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 08c4cb1 commit 01b0324

File tree

6 files changed

+70
-6
lines changed

6 files changed

+70
-6
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.batch.core.repository.dao.JobExecutionDao;
2525
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2626
import org.springframework.batch.core.repository.persistence.converter.JobInstanceConverter;
27+
import org.springframework.dao.OptimisticLockingFailureException;
2728
import org.springframework.data.domain.Sort;
2829
import org.springframework.data.mongodb.core.MongoOperations;
2930
import org.springframework.data.mongodb.core.query.Query;
@@ -34,6 +35,7 @@
3435

3536
/**
3637
* @author Mahmoud Ben Hassine
38+
* @author Yanming Zhou
3739
* @since 5.2.0
3840
*/
3941
public class MongoJobExecutionDao implements JobExecutionDao {
@@ -73,10 +75,16 @@ public void saveJobExecution(JobExecution jobExecution) {
7375

7476
@Override
7577
public void updateJobExecution(JobExecution jobExecution) {
76-
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
78+
Query query = query(
79+
where("jobExecutionId").is(jobExecution.getId()).and("version").is(jobExecution.getVersion()));
7780
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
7881
.fromJobExecution(jobExecution);
79-
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
82+
jobExecutionToUpdate.incrementVersion();
83+
if (this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME) == null) {
84+
throw new OptimisticLockingFailureException("Attempt to update step execution id=" + jobExecution.getId()
85+
+ " with wrong version (" + jobExecution.getVersion() + ")");
86+
}
87+
jobExecution.incrementVersion();
8088
}
8189

8290
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.batch.core.repository.dao.StepExecutionDao;
2828
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2929
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
30+
import org.springframework.dao.OptimisticLockingFailureException;
3031
import org.springframework.data.mongodb.core.MongoOperations;
3132
import org.springframework.data.mongodb.core.query.Query;
3233
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
@@ -36,6 +37,7 @@
3637

3738
/**
3839
* @author Mahmoud Ben Hassine
40+
* @author Yanming Zhou
3941
* @since 5.2.0
4042
*/
4143
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -82,10 +84,17 @@ public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
8284

8385
@Override
8486
public void updateStepExecution(StepExecution stepExecution) {
85-
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
87+
Query query = query(
88+
where("stepExecutionId").is(stepExecution.getId()).and("version").is(stepExecution.getVersion()));
8689
org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter
8790
.fromStepExecution(stepExecution);
88-
this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME);
91+
stepExecutionToUpdate.incrementVersion();
92+
if (this.mongoOperations.findAndReplace(query, stepExecutionToUpdate,
93+
STEP_EXECUTIONS_COLLECTION_NAME) == null) {
94+
throw new OptimisticLockingFailureException("Attempt to update step execution id=" + stepExecution.getId()
95+
+ " with wrong version (" + stepExecution.getVersion() + ")");
96+
}
97+
stepExecution.incrementVersion();
8998
}
9099

91100
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
/**
2727
* @author Mahmoud Ben Hassine
28+
* @author Yanming Zhou
2829
* @since 5.2.0
2930
*/
3031
public class JobExecution {
@@ -53,6 +54,8 @@ public class JobExecution {
5354

5455
private ExecutionContext executionContext;
5556

57+
private Integer version;
58+
5659
public JobExecution() {
5760
}
5861

@@ -148,13 +151,30 @@ public void setExecutionContext(ExecutionContext executionContext) {
148151
this.executionContext = executionContext;
149152
}
150153

154+
public Integer getVersion() {
155+
return version;
156+
}
157+
158+
public void setVersion(Integer version) {
159+
this.version = version;
160+
}
161+
162+
public void incrementVersion() {
163+
if (version == null) {
164+
version = 0;
165+
}
166+
else {
167+
version = version + 1;
168+
}
169+
}
170+
151171
@Override
152172
public String toString() {
153173
return "JobExecution{" + "id='" + id + '\'' + ", jobExecutionId=" + jobExecutionId + ", jobInstanceId="
154174
+ jobInstanceId + ", jobParameters=" + jobParameters + ", stepExecutions=" + stepExecutions
155175
+ ", status=" + status + ", startTime=" + startTime + ", createTime=" + createTime + ", endTime="
156176
+ endTime + ", lastUpdated=" + lastUpdated + ", exitStatus=" + exitStatus + ", executionContext="
157-
+ executionContext + '}';
177+
+ executionContext + ", version=" + version + '}';
158178
}
159179

160180
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/**
2323
* @author Mahmoud Ben Hassine
24+
* @author Yanming Zhou
2425
* @since 5.2.0
2526
*/
2627
public class StepExecution {
@@ -65,6 +66,8 @@ public class StepExecution {
6566

6667
private boolean terminateOnly;
6768

69+
private Integer version;
70+
6871
public StepExecution() {
6972
}
7073

@@ -224,6 +227,23 @@ public void setTerminateOnly(boolean terminateOnly) {
224227
this.terminateOnly = terminateOnly;
225228
}
226229

230+
public Integer getVersion() {
231+
return version;
232+
}
233+
234+
public void setVersion(Integer version) {
235+
this.version = version;
236+
}
237+
238+
public void incrementVersion() {
239+
if (version == null) {
240+
version = 0;
241+
}
242+
else {
243+
version = version + 1;
244+
}
245+
}
246+
227247
@Override
228248
public String toString() {
229249
return "StepExecution{" + "id='" + id + '\'' + ", stepExecutionId=" + stepExecutionId + ", jobExecutionId='"
@@ -232,7 +252,8 @@ public String toString() {
232252
+ ", readSkipCount=" + readSkipCount + ", processSkipCount=" + processSkipCount + ", writeSkipCount="
233253
+ writeSkipCount + ", filterCount=" + filterCount + ", startTime=" + startTime + ", createTime="
234254
+ createTime + ", endTime=" + endTime + ", lastUpdated=" + lastUpdated + ", executionContext="
235-
+ executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + '}';
255+
+ executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + ", version="
256+
+ version + '}';
236257
}
237258

238259
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
/**
2929
* @author Mahmoud Ben Hassine
30+
* @author Yanming Zhou
3031
* @since 5.2.0
3132
*/
3233
public class JobExecutionConverter {
@@ -54,6 +55,7 @@ public org.springframework.batch.core.JobExecution toJobExecution(JobExecution s
5455
source.getExitStatus().exitDescription()));
5556
jobExecution.setExecutionContext(
5657
new org.springframework.batch.item.ExecutionContext(source.getExecutionContext().map()));
58+
jobExecution.setVersion(source.getVersion());
5759
return jobExecution;
5860
}
5961

@@ -77,6 +79,7 @@ public JobExecution fromJobExecution(org.springframework.batch.core.JobExecution
7779
new ExitStatus(source.getExitStatus().getExitCode(), source.getExitStatus().getExitDescription()));
7880
org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext();
7981
jobExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty()));
82+
jobExecution.setVersion(source.getVersion());
8083
return jobExecution;
8184
}
8285

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
/**
2424
* @author Mahmoud Ben Hassine
25+
* @author Yanming Zhou
2526
* @since 5.2.0
2627
*/
2728
public class StepExecutionConverter {
@@ -50,6 +51,7 @@ public org.springframework.batch.core.StepExecution toStepExecution(StepExecutio
5051
if (source.isTerminateOnly()) {
5152
stepExecution.setTerminateOnly();
5253
}
54+
stepExecution.setVersion(source.getVersion());
5355
return stepExecution;
5456
}
5557

@@ -77,6 +79,7 @@ public StepExecution fromStepExecution(org.springframework.batch.core.StepExecut
7779
org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext();
7880
stepExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty()));
7981
stepExecution.setTerminateOnly(source.isTerminateOnly());
82+
stepExecution.setVersion(source.getVersion());
8083
return stepExecution;
8184
}
8285

0 commit comments

Comments
 (0)