Skip to content

Commit 42ee309

Browse files
committed
dd collection prefix support for MongoDB job repository
- Add collectionPrefix parameter to EnableMongoJobRepository annotation - Update MongoDB DAO classes to support configurable collection names - Maintain backward compatibility with default "BATCH_" prefix Signed-off-by: Myeongha Shin <sky95012@gmail.com>
1 parent 76e723e commit 42ee309

File tree

11 files changed

+304
-57
lines changed

11 files changed

+304
-57
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* Batch in a declarative way through {@link EnableBatchProcessing}.
4343
*
4444
* @author Mahmoud Ben Hassine
45+
* @author Myeongha Shin
4546
* @since 5.0
4647
* @see EnableBatchProcessing
4748
*/
@@ -185,6 +186,11 @@ private void registerMongoJobRepository(BeanDefinitionRegistry registry,
185186
beanDefinitionBuilder.addPropertyValue("isolationLevelForCreate", isolationLevelForCreate);
186187
}
187188

189+
String collectionPrefix = mongoJobRepositoryAnnotation.collectionPrefix();
190+
if (collectionPrefix != null) {
191+
beanDefinitionBuilder.addPropertyValue("collectionPrefix", collectionPrefix);
192+
}
193+
188194
String jobKeyGeneratorRef = mongoJobRepositoryAnnotation.jobKeyGeneratorRef();
189195
if (registry.containsBeanDefinition(jobKeyGeneratorRef)) {
190196
beanDefinitionBuilder.addPropertyReference("jobKeyGenerator", jobKeyGeneratorRef);

spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,10 @@
7777
*/
7878
String jobKeyGeneratorRef() default "jobKeyGenerator";
7979

80+
/**
81+
* Set the prefix for MongoDB collection names. Defaults to {@literal BATCH_}.
82+
* @return the collection prefix to use
83+
*/
84+
String collectionPrefix() default "BATCH_";
85+
8086
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,42 @@
2424
import org.springframework.data.mongodb.core.MongoOperations;
2525
import org.springframework.data.mongodb.core.query.Query;
2626
import org.springframework.data.mongodb.core.query.Update;
27+
import org.springframework.util.Assert;
2728

2829
import static org.springframework.data.mongodb.core.query.Criteria.where;
2930
import static org.springframework.data.mongodb.core.query.Query.query;
3031

3132
/**
3233
* @author Mahmoud Ben Hassine
34+
* @author Myeongha Shin
3335
* @since 5.2.0
3436
*/
3537
public class MongoExecutionContextDao implements ExecutionContextDao {
3638

37-
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";
39+
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION";
3840

39-
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
41+
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";
4042

4143
private final MongoOperations mongoOperations;
4244

43-
public MongoExecutionContextDao(MongoOperations mongoOperations) {
45+
private final String stepExecutionCollectionName;
46+
47+
private final String jobExecutionCollectionName;
48+
49+
public MongoExecutionContextDao(MongoOperations mongoOperations, String collectionPrefix) {
50+
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
51+
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
4452
this.mongoOperations = mongoOperations;
53+
this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME;
54+
this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
4555
}
4656

4757
@Override
4858
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
4959
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
5060
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne(
5161
query, org.springframework.batch.core.repository.persistence.JobExecution.class,
52-
JOB_EXECUTIONS_COLLECTION_NAME);
62+
jobExecutionCollectionName);
5363
if (execution == null) {
5464
return new ExecutionContext();
5565
}
@@ -61,7 +71,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
6171
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
6272
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne(
6373
query, org.springframework.batch.core.repository.persistence.StepExecution.class,
64-
STEP_EXECUTIONS_COLLECTION_NAME);
74+
stepExecutionCollectionName);
6575
if (execution == null) {
6676
return new ExecutionContext();
6777
}
@@ -77,8 +87,7 @@ public void saveExecutionContext(JobExecution jobExecution) {
7787
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
7888
executionContext.isDirty()));
7989
this.mongoOperations.updateFirst(query, update,
80-
org.springframework.batch.core.repository.persistence.JobExecution.class,
81-
JOB_EXECUTIONS_COLLECTION_NAME);
90+
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionCollectionName);
8291
}
8392

8493
@Override
@@ -90,8 +99,7 @@ public void saveExecutionContext(StepExecution stepExecution) {
9099
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
91100
executionContext.isDirty()));
92101
this.mongoOperations.updateFirst(query, update,
93-
org.springframework.batch.core.repository.persistence.StepExecution.class,
94-
STEP_EXECUTIONS_COLLECTION_NAME);
102+
org.springframework.batch.core.repository.persistence.StepExecution.class, stepExecutionCollectionName);
95103

96104
}
97105

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,30 @@
3434

3535
/**
3636
* @author Mahmoud Ben Hassine
37+
* @author Myeongha Shin
3738
* @since 5.2.0
3839
*/
3940
public class MongoJobExecutionDao implements JobExecutionDao {
4041

41-
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
42+
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";
4243

43-
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";
44+
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "JOB_EXECUTION_SEQ";
4445

4546
private final MongoOperations mongoOperations;
4647

48+
private final String jobExecutionsCollectionName;
49+
4750
private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();
4851

4952
private DataFieldMaxValueIncrementer jobExecutionIncrementer;
5053

5154
private MongoJobInstanceDao jobInstanceDao;
5255

53-
public MongoJobExecutionDao(MongoOperations mongoOperations) {
56+
public MongoJobExecutionDao(MongoOperations mongoOperations, String collectionPrefix) {
5457
this.mongoOperations = mongoOperations;
55-
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
58+
this.jobExecutionsCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
59+
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations,
60+
collectionPrefix + JOB_EXECUTIONS_SEQUENCE_NAME);
5661
}
5762

5863
public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
@@ -69,7 +74,7 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo
6974

7075
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter
7176
.fromJobExecution(jobExecution);
72-
this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME);
77+
this.mongoOperations.insert(jobExecutionToSave, jobExecutionsCollectionName);
7378

7479
return jobExecution;
7580
}
@@ -79,15 +84,15 @@ public void updateJobExecution(JobExecution jobExecution) {
7984
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
8085
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
8186
.fromJobExecution(jobExecution);
82-
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
87+
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, jobExecutionsCollectionName);
8388
}
8489

8590
@Override
8691
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
8792
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
8893
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
8994
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
90-
JOB_EXECUTIONS_COLLECTION_NAME);
95+
jobExecutionsCollectionName);
9196
return jobExecutions.stream()
9297
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
9398
.toList();
@@ -99,8 +104,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
99104
Sort.Order sortOrder = Sort.Order.desc("jobExecutionId");
100105
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
101106
query.with(Sort.by(sortOrder)),
102-
org.springframework.batch.core.repository.persistence.JobExecution.class,
103-
JOB_EXECUTIONS_COLLECTION_NAME);
107+
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionsCollectionName);
104108
return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null;
105109
}
106110

@@ -113,7 +117,7 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
113117
where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING"));
114118
this.mongoOperations
115119
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
116-
JOB_EXECUTIONS_COLLECTION_NAME)
120+
jobExecutionsCollectionName)
117121
.stream()
118122
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
119123
.forEach(runningJobExecutions::add);
@@ -126,7 +130,7 @@ public JobExecution getJobExecution(long executionId) {
126130
Query jobExecutionQuery = query(where("jobExecutionId").is(executionId));
127131
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
128132
jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
129-
JOB_EXECUTIONS_COLLECTION_NAME);
133+
jobExecutionsCollectionName);
130134
if (jobExecution == null) {
131135
return null;
132136
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,31 @@
3636

3737
/**
3838
* @author Mahmoud Ben Hassine
39+
* @author Myeongha Shin
3940
* @since 5.2.0
4041
*/
4142
public class MongoJobInstanceDao implements JobInstanceDao {
4243

43-
private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE";
44+
private static final String COLLECTION_NAME = "JOB_INSTANCE";
4445

45-
private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ";
46+
private static final String SEQUENCE_NAME = "JOB_INSTANCE_SEQ";
4647

4748
private final MongoOperations mongoOperations;
4849

50+
private final String collectionName;
51+
4952
private DataFieldMaxValueIncrementer jobInstanceIncrementer;
5053

5154
private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator();
5255

5356
private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter();
5457

55-
public MongoJobInstanceDao(MongoOperations mongoOperations) {
58+
public MongoJobInstanceDao(MongoOperations mongoOperations, String collectionPrefix) {
5659
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
60+
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
5761
this.mongoOperations = mongoOperations;
58-
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME);
62+
this.collectionName = collectionPrefix + COLLECTION_NAME;
63+
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, collectionPrefix + SEQUENCE_NAME);
5964
}
6065

6166
public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) {
@@ -79,7 +84,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
7984
jobInstanceToSave.setJobKey(key);
8085
long instanceId = jobInstanceIncrementer.nextLongValue();
8186
jobInstanceToSave.setJobInstanceId(instanceId);
82-
this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME);
87+
this.mongoOperations.insert(jobInstanceToSave, this.collectionName);
8388

8489
JobInstance jobInstance = new JobInstance(instanceId, jobName);
8590
jobInstance.incrementVersion(); // TODO is this needed?
@@ -90,16 +95,16 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
9095
public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
9196
String key = this.jobKeyGenerator.generateKey(jobParameters);
9297
Query query = query(where("jobName").is(jobName).and("jobKey").is(key));
93-
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
94-
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
98+
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
99+
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
95100
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
96101
}
97102

98103
@Override
99104
public JobInstance getJobInstance(long instanceId) {
100105
Query query = query(where("jobInstanceId").is(instanceId));
101-
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
102-
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
106+
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
107+
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
103108
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
104109
}
105110

@@ -114,7 +119,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
114119
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
115120
List<org.springframework.batch.core.repository.persistence.JobInstance> jobInstances = this.mongoOperations
116121
.find(query.with(Sort.by(sortOrder)),
117-
org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
122+
org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
118123
.stream()
119124
.toList();
120125
return jobInstances.subList(start, jobInstances.size())
@@ -134,7 +139,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
134139
public List<JobInstance> getJobInstances(String jobName) {
135140
Query query = query(where("jobName").is(jobName));
136141
return this.mongoOperations
137-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
142+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
138143
.stream()
139144
.map(this.jobInstanceConverter::toJobInstance)
140145
.toList();
@@ -144,7 +149,7 @@ public List<JobInstance> getJobInstances(String jobName) {
144149
public List<Long> getJobInstanceIds(String jobName) {
145150
Query query = query(where("jobName").is(jobName));
146151
return this.mongoOperations
147-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
152+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
148153
.stream()
149154
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobInstanceId)
150155
.toList();
@@ -153,7 +158,7 @@ public List<Long> getJobInstanceIds(String jobName) {
153158
public List<JobInstance> findJobInstancesByName(String jobName) {
154159
Query query = query(where("jobName").is(jobName));
155160
return this.mongoOperations
156-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
161+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
157162
.stream()
158163
.map(this.jobInstanceConverter::toJobInstance)
159164
.toList();
@@ -165,14 +170,14 @@ public JobInstance getLastJobInstance(String jobName) {
165170
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
166171
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
167172
query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobInstance.class,
168-
COLLECTION_NAME);
173+
this.collectionName);
169174
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
170175
}
171176

172177
@Override
173178
public List<String> getJobNames() {
174179
return this.mongoOperations
175-
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
180+
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
176181
.stream()
177182
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobName)
178183
.toList();
@@ -195,7 +200,7 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
195200
throw new NoSuchJobException("Job not found " + jobName);
196201
}
197202
Query query = query(where("jobName").is(jobName));
198-
return this.mongoOperations.count(query, COLLECTION_NAME);
203+
return this.mongoOperations.count(query, this.collectionName);
199204
}
200205

201206
}

0 commit comments

Comments
 (0)