Skip to content

Commit c490a6a

Browse files
committed
MongoDB collection prefix support for MongoDB job repository
- Add collectionPrefix parameter to EnableMongoJobRepository annotation - Update MongoDB DAO classes to support configurable collection names - Maintain backward compatibility with default "BATCH_" prefix Signed-off-by: Myeongha Shin <[email protected]>
1 parent bf282b4 commit c490a6a

File tree

10 files changed

+313
-64
lines changed

10 files changed

+313
-64
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/BatchRegistrar.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* Batch in a declarative way through {@link EnableBatchProcessing}.
4343
*
4444
* @author Mahmoud Ben Hassine
45+
* @author Myeongha Shin
4546
* @since 5.0
4647
* @see EnableBatchProcessing
4748
*/
@@ -185,6 +186,11 @@ private void registerMongoJobRepository(BeanDefinitionRegistry registry,
185186
beanDefinitionBuilder.addPropertyValue("isolationLevelForCreate", isolationLevelForCreate);
186187
}
187188

189+
String collectionPrefix = mongoJobRepositoryAnnotation.collectionPrefix();
190+
if (collectionPrefix != null) {
191+
beanDefinitionBuilder.addPropertyValue("collectionPrefix", collectionPrefix);
192+
}
193+
188194
String jobKeyGeneratorRef = mongoJobRepositoryAnnotation.jobKeyGeneratorRef();
189195
if (registry.containsBeanDefinition(jobKeyGeneratorRef)) {
190196
beanDefinitionBuilder.addPropertyReference("jobKeyGenerator", jobKeyGeneratorRef);

spring-batch-core/src/main/java/org/springframework/batch/core/configuration/annotation/EnableMongoJobRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,10 @@
9898
*/
9999
String stepExecutionIncrementerRef() default "stepExecutionIncrementer";
100100

101+
/**
102+
* Set the prefix for MongoDB collection names. Defaults to {@literal BATCH_}.
103+
* @return the collection prefix to use
104+
*/
105+
String collectionPrefix() default "BATCH_";
106+
101107
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoExecutionContextDao.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,42 @@
2525
import org.springframework.data.mongodb.core.MongoOperations;
2626
import org.springframework.data.mongodb.core.query.Query;
2727
import org.springframework.data.mongodb.core.query.Update;
28+
import org.springframework.util.Assert;
2829

2930
import static org.springframework.data.mongodb.core.query.Criteria.where;
3031
import static org.springframework.data.mongodb.core.query.Query.query;
3132

3233
/**
3334
* @author Mahmoud Ben Hassine
35+
* @author Myeongha Shin
3436
* @since 5.2.0
3537
*/
3638
public class MongoExecutionContextDao implements ExecutionContextDao {
3739

38-
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";
40+
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION";
3941

40-
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
42+
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";
4143

4244
private final MongoOperations mongoOperations;
4345

44-
public MongoExecutionContextDao(MongoOperations mongoOperations) {
46+
private final String stepExecutionCollectionName;
47+
48+
private final String jobExecutionCollectionName;
49+
50+
public MongoExecutionContextDao(MongoOperations mongoOperations, String collectionPrefix) {
51+
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
52+
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
4553
this.mongoOperations = mongoOperations;
54+
this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME;
55+
this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
4656
}
4757

4858
@Override
4959
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
5060
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
5161
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne(
5262
query, org.springframework.batch.core.repository.persistence.JobExecution.class,
53-
JOB_EXECUTIONS_COLLECTION_NAME);
63+
jobExecutionCollectionName);
5464
if (execution == null) {
5565
return new ExecutionContext();
5666
}
@@ -62,7 +72,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
6272
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
6373
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne(
6474
query, org.springframework.batch.core.repository.persistence.StepExecution.class,
65-
STEP_EXECUTIONS_COLLECTION_NAME);
75+
stepExecutionCollectionName);
6676
if (execution == null) {
6777
return new ExecutionContext();
6878
}
@@ -78,8 +88,7 @@ public void saveExecutionContext(JobExecution jobExecution) {
7888
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
7989
executionContext.isDirty()));
8090
this.mongoOperations.updateFirst(query, update,
81-
org.springframework.batch.core.repository.persistence.JobExecution.class,
82-
JOB_EXECUTIONS_COLLECTION_NAME);
91+
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionCollectionName);
8392
}
8493

8594
@Override
@@ -91,8 +100,7 @@ public void saveExecutionContext(StepExecution stepExecution) {
91100
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
92101
executionContext.isDirty()));
93102
this.mongoOperations.updateFirst(query, update,
94-
org.springframework.batch.core.repository.persistence.StepExecution.class,
95-
STEP_EXECUTIONS_COLLECTION_NAME);
103+
org.springframework.batch.core.repository.persistence.StepExecution.class, stepExecutionCollectionName);
96104

97105
}
98106

@@ -119,7 +127,7 @@ public void deleteExecutionContext(JobExecution jobExecution) {
119127
org.springframework.batch.core.repository.persistence.ExecutionContext executionContext = new org.springframework.batch.core.repository.persistence.ExecutionContext(
120128
Collections.emptyMap(), false);
121129
Update executionContextRemovalUpdate = new Update().set("executionContext", executionContext);
122-
this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
130+
this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, this.jobExecutionCollectionName);
123131
}
124132

125133
@Override
@@ -128,7 +136,7 @@ public void deleteExecutionContext(StepExecution stepExecution) {
128136
org.springframework.batch.core.repository.persistence.ExecutionContext executionContext = new org.springframework.batch.core.repository.persistence.ExecutionContext(
129137
Collections.emptyMap(), false);
130138
Update executionContextRemovalUpdate = new Update().set("executionContext", executionContext);
131-
this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, STEP_EXECUTIONS_COLLECTION_NAME);
139+
this.mongoOperations.updateFirst(query, executionContextRemovalUpdate, this.stepExecutionCollectionName);
132140
}
133141

134142
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobExecutionDao.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,25 +44,30 @@
4444
/**
4545
* @author Mahmoud Ben Hassine
4646
* @author Yanming Zhou
47+
* @author Myeongha Shin
4748
* @since 5.2.0
4849
*/
4950
public class MongoJobExecutionDao implements JobExecutionDao {
5051

51-
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
52+
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";
5253

53-
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";
54+
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "JOB_EXECUTION_SEQ";
5455

5556
private final MongoOperations mongoOperations;
5657

58+
private final String jobExecutionsCollectionName;
59+
5760
private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();
5861

5962
private DataFieldMaxValueIncrementer jobExecutionIncrementer;
6063

6164
private MongoJobInstanceDao jobInstanceDao;
6265

63-
public MongoJobExecutionDao(MongoOperations mongoOperations) {
66+
public MongoJobExecutionDao(MongoOperations mongoOperations, String collectionPrefix) {
6467
this.mongoOperations = mongoOperations;
65-
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
68+
this.jobExecutionsCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
69+
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations,
70+
collectionPrefix + JOB_EXECUTIONS_SEQUENCE_NAME);
6671
}
6772

6873
public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
@@ -79,7 +84,7 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo
7984

8085
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter
8186
.fromJobExecution(jobExecution);
82-
this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME);
87+
this.mongoOperations.insert(jobExecutionToSave, jobExecutionsCollectionName);
8388

8489
return jobExecution;
8590
}
@@ -89,7 +94,7 @@ public void updateJobExecution(JobExecution jobExecution) {
8994
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
9095
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
9196
.fromJobExecution(jobExecution);
92-
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
97+
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, jobExecutionsCollectionName);
9398
}
9499

95100
@Override
@@ -98,7 +103,7 @@ public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
98103
.with(Sort.by(Sort.Direction.DESC, "jobExecutionId"));
99104
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
100105
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
101-
JOB_EXECUTIONS_COLLECTION_NAME);
106+
jobExecutionsCollectionName);
102107
return jobExecutions.stream().map(jobExecution -> convert(jobExecution, jobInstance)).toList();
103108
}
104109

@@ -108,8 +113,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
108113
Sort.Order sortOrder = Sort.Order.desc("jobExecutionId");
109114
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
110115
query.with(Sort.by(sortOrder)),
111-
org.springframework.batch.core.repository.persistence.JobExecution.class,
112-
JOB_EXECUTIONS_COLLECTION_NAME);
116+
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionsCollectionName);
113117
return jobExecution != null ? convert(jobExecution, jobInstance) : null;
114118
}
115119

@@ -122,7 +126,7 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
122126
where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING"));
123127
this.mongoOperations
124128
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
125-
JOB_EXECUTIONS_COLLECTION_NAME)
129+
jobExecutionsCollectionName)
126130
.stream()
127131
.map(jobExecution -> convert(jobExecution, jobInstance))
128132
.forEach(runningJobExecutions::add);
@@ -135,7 +139,7 @@ public JobExecution getJobExecution(long executionId) {
135139
Query jobExecutionQuery = query(where("jobExecutionId").is(executionId));
136140
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
137141
jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
138-
JOB_EXECUTIONS_COLLECTION_NAME);
142+
jobExecutionsCollectionName);
139143
if (jobExecution == null) {
140144
return null;
141145
}
@@ -158,15 +162,15 @@ public void synchronizeStatus(JobExecution jobExecution) {
158162
@Override
159163
public void deleteJobExecution(JobExecution jobExecution) {
160164
this.mongoOperations.remove(query(where("jobExecutionId").is(jobExecution.getId())),
161-
JOB_EXECUTIONS_COLLECTION_NAME);
165+
this.jobExecutionsCollectionName);
162166

163167
}
164168

165169
@Override
166170
public void deleteJobExecutionParameters(JobExecution jobExecution) {
167171
Query query = new Query(where("jobExecutionId").is(jobExecution.getId()));
168172
Update jobParametersRemovalUpdate = new Update().set("jobParameters", Collections.emptyList());
169-
this.mongoOperations.updateFirst(query, jobParametersRemovalUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
173+
this.mongoOperations.updateFirst(query, jobParametersRemovalUpdate, this.jobExecutionsCollectionName);
170174
}
171175

172176
private JobExecution convert(org.springframework.batch.core.repository.persistence.JobExecution jobExecution,

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoJobInstanceDao.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,31 @@
3838
/**
3939
* @author Mahmoud Ben Hassine
4040
* @author Yanming Zhou
41+
* @author Myeongha Shin
4142
* @since 5.2.0
4243
*/
4344
public class MongoJobInstanceDao implements JobInstanceDao {
4445

45-
private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE";
46+
private static final String COLLECTION_NAME = "JOB_INSTANCE";
4647

47-
private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ";
48+
private static final String SEQUENCE_NAME = "JOB_INSTANCE_SEQ";
4849

4950
private final MongoOperations mongoOperations;
5051

52+
private final String collectionName;
53+
5154
private DataFieldMaxValueIncrementer jobInstanceIncrementer;
5255

5356
private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator();
5457

5558
private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter();
5659

57-
public MongoJobInstanceDao(MongoOperations mongoOperations) {
60+
public MongoJobInstanceDao(MongoOperations mongoOperations, String collectionPrefix) {
5861
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
62+
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
5963
this.mongoOperations = mongoOperations;
60-
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME);
64+
this.collectionName = collectionPrefix + COLLECTION_NAME;
65+
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, collectionPrefix + SEQUENCE_NAME);
6166
}
6267

6368
public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) {
@@ -81,7 +86,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
8186
jobInstanceToSave.setJobKey(key);
8287
long instanceId = jobInstanceIncrementer.nextLongValue();
8388
jobInstanceToSave.setJobInstanceId(instanceId);
84-
this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME);
89+
this.mongoOperations.insert(jobInstanceToSave, this.collectionName);
8590

8691
JobInstance jobInstance = new JobInstance(instanceId, jobName);
8792
jobInstance.incrementVersion(); // TODO is this needed?
@@ -92,16 +97,16 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
9297
public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
9398
String key = this.jobKeyGenerator.generateKey(jobParameters);
9499
Query query = query(where("jobName").is(jobName).and("jobKey").is(key));
95-
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
96-
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
100+
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
101+
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
97102
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
98103
}
99104

100105
@Override
101106
public JobInstance getJobInstance(long instanceId) {
102107
Query query = query(where("jobInstanceId").is(instanceId));
103-
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
104-
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
108+
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
109+
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
105110
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
106111
}
107112

@@ -116,7 +121,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
116121
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
117122
List<org.springframework.batch.core.repository.persistence.JobInstance> jobInstances = this.mongoOperations
118123
.find(query.with(Sort.by(sortOrder)),
119-
org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
124+
org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
120125
.stream()
121126
.toList();
122127
if (jobInstances.size() <= start) {
@@ -139,7 +144,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
139144
public List<JobInstance> getJobInstances(String jobName) {
140145
Query query = query(where("jobName").is(jobName));
141146
return this.mongoOperations
142-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
147+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
143148
.stream()
144149
.map(this.jobInstanceConverter::toJobInstance)
145150
.toList();
@@ -149,7 +154,7 @@ public List<JobInstance> getJobInstances(String jobName) {
149154
public List<Long> getJobInstanceIds(String jobName) {
150155
Query query = query(where("jobName").is(jobName));
151156
return this.mongoOperations
152-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
157+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
153158
.stream()
154159
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobInstanceId)
155160
.toList();
@@ -158,7 +163,7 @@ public List<Long> getJobInstanceIds(String jobName) {
158163
public List<JobInstance> findJobInstancesByName(String jobName) {
159164
Query query = query(where("jobName").is(jobName));
160165
return this.mongoOperations
161-
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
166+
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
162167
.stream()
163168
.map(this.jobInstanceConverter::toJobInstance)
164169
.toList();
@@ -170,14 +175,14 @@ public JobInstance getLastJobInstance(String jobName) {
170175
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
171176
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
172177
query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobInstance.class,
173-
COLLECTION_NAME);
178+
this.collectionName);
174179
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
175180
}
176181

177182
@Override
178183
public List<String> getJobNames() {
179184
return this.mongoOperations
180-
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
185+
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
181186
.stream()
182187
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobName)
183188
.toList();
@@ -200,12 +205,12 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
200205
throw new NoSuchJobException("Job not found " + jobName);
201206
}
202207
Query query = query(where("jobName").is(jobName));
203-
return this.mongoOperations.count(query, COLLECTION_NAME);
208+
return this.mongoOperations.count(query, this.collectionName);
204209
}
205210

206211
@Override
207212
public void deleteJobInstance(JobInstance jobInstance) {
208-
this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), COLLECTION_NAME);
213+
this.mongoOperations.remove(query(where("jobInstanceId").is(jobInstance.getId())), this.collectionName);
209214
}
210215

211216
}

0 commit comments

Comments
 (0)