Skip to content

Commit b103900

Browse files
juchaneifmbenhassine
authored andcommitted
Add cursor-based ItemReader for MongoDB
Issue #4323
1 parent 6a93f6e commit b103900

File tree

3 files changed

+911
-0
lines changed

3 files changed

+911
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.data;
17+
18+
import java.time.Duration;
19+
import java.time.temporal.ChronoUnit;
20+
import java.util.ArrayList;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.stream.Stream;
25+
26+
import org.bson.Document;
27+
import org.bson.codecs.DecoderContext;
28+
29+
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
30+
import org.springframework.beans.factory.InitializingBean;
31+
import org.springframework.data.domain.Sort;
32+
import org.springframework.data.mongodb.core.MongoOperations;
33+
import org.springframework.data.mongodb.core.query.BasicQuery;
34+
import org.springframework.data.mongodb.core.query.Query;
35+
import org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec;
36+
import org.springframework.data.mongodb.util.json.ParameterBindingJsonReader;
37+
import org.springframework.data.util.CloseableIterator;
38+
import org.springframework.util.Assert;
39+
import org.springframework.util.ClassUtils;
40+
import org.springframework.util.StringUtils;
41+
42+
/**
43+
* @author LEE Juchan
44+
* @since 5.0
45+
*/
46+
public class MongoCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
47+
48+
private MongoOperations template;
49+
50+
private Class<? extends T> targetType;
51+
52+
private String collection;
53+
54+
private Query query;
55+
56+
private String queryString;
57+
58+
private List<Object> parameterValues = new ArrayList<>();
59+
60+
private String fields;
61+
62+
private Sort sort;
63+
64+
private String hint;
65+
66+
private Integer batchSize;
67+
68+
private Integer limit;
69+
70+
private Integer maxTimeMs;
71+
72+
private CloseableIterator<? extends T> cursor;
73+
74+
public MongoCursorItemReader() {
75+
super();
76+
setName(ClassUtils.getShortName(MongoCursorItemReader.class));
77+
}
78+
79+
/**
80+
* Used to perform operations against the MongoDB instance. Also handles the mapping
81+
* of documents to objects.
82+
* @param template the MongoOperations instance to use
83+
* @see MongoOperations
84+
*/
85+
public void setTemplate(MongoOperations template) {
86+
this.template = template;
87+
}
88+
89+
/**
90+
* The targetType of object to be returned for each {@link #read()} call.
91+
* @param targetType the targetType of object to return
92+
*/
93+
public void setTargetType(Class<? extends T> targetType) {
94+
this.targetType = targetType;
95+
}
96+
97+
/**
98+
* @param collection Mongo collection to be queried.
99+
*/
100+
public void setCollection(String collection) {
101+
this.collection = collection;
102+
}
103+
104+
/**
105+
* A Mongo Query to be used.
106+
* @param query Mongo Query to be used.
107+
*/
108+
public void setQuery(Query query) {
109+
this.query = query;
110+
}
111+
112+
/**
113+
* A JSON formatted MongoDB query. Parameterization of the provided query is allowed
114+
* via ?&lt;index&gt; placeholders where the &lt;index&gt; indicates the index of the
115+
* parameterValue to substitute.
116+
* @param queryString JSON formatted Mongo query
117+
*/
118+
public void setQuery(String queryString) {
119+
this.queryString = queryString;
120+
}
121+
122+
/**
123+
* {@link List} of values to be substituted in for each of the parameters in the
124+
* query.
125+
* @param parameterValues values
126+
*/
127+
public void setParameterValues(List<Object> parameterValues) {
128+
Assert.notNull(parameterValues, "Parameter values must not be null");
129+
this.parameterValues = parameterValues;
130+
}
131+
132+
/**
133+
* JSON defining the fields to be returned from the matching documents by MongoDB.
134+
* @param fields JSON string that identifies the fields to sort by.
135+
*/
136+
public void setFields(String fields) {
137+
this.fields = fields;
138+
}
139+
140+
/**
141+
* {@link Map} of property
142+
* names/{@link org.springframework.data.domain.Sort.Direction} values to sort the
143+
* input by.
144+
* @param sorts map of properties and direction to sort each.
145+
*/
146+
public void setSort(Map<String, Sort.Direction> sorts) {
147+
Assert.notNull(sorts, "Sorts must not be null");
148+
this.sort = convertToSort(sorts);
149+
}
150+
151+
/**
152+
* JSON String telling MongoDB what index to use.
153+
* @param hint string indicating what index to use.
154+
*/
155+
public void setHint(String hint) {
156+
this.hint = hint;
157+
}
158+
159+
/**
160+
* The size of batches to use when iterating over results.
161+
* @param batchSize size the batch size to apply to the cursor
162+
*/
163+
public void setBatchSize(Integer batchSize) {
164+
this.batchSize = batchSize;
165+
}
166+
167+
/**
168+
* The query limit
169+
* @param limit The limit
170+
*/
171+
public void setLimit(Integer limit) {
172+
this.limit = limit;
173+
}
174+
175+
/**
176+
* The maximum execution time for the aggregation command
177+
* @param maxTimeMs The max time
178+
*/
179+
public void setMaxTimeMs(Integer maxTimeMs) {
180+
this.maxTimeMs = maxTimeMs;
181+
}
182+
183+
/**
184+
* Checks mandatory properties
185+
*
186+
* @see InitializingBean#afterPropertiesSet()
187+
*/
188+
@Override
189+
public void afterPropertiesSet() {
190+
Assert.state(template != null, "An implementation of MongoOperations is required.");
191+
Assert.state(targetType != null, "A targetType to convert the input into is required.");
192+
Assert.state(queryString != null || query != null, "A query is required.");
193+
194+
if (queryString != null) {
195+
Assert.state(sort != null, "A sort is required.");
196+
}
197+
}
198+
199+
@Override
200+
protected void doOpen() throws Exception {
201+
Query mongoQuery;
202+
if (queryString != null) {
203+
mongoQuery = createQuery();
204+
} else {
205+
mongoQuery = query;
206+
}
207+
208+
Stream<? extends T> stream;
209+
if (StringUtils.hasText(collection)) {
210+
stream = template.stream(mongoQuery, targetType, collection);
211+
} else {
212+
stream = template.stream(mongoQuery, targetType);
213+
}
214+
215+
this.cursor = streamToIterator(stream);
216+
}
217+
218+
@Override
219+
protected T doRead() throws Exception {
220+
return cursor.hasNext() ? cursor.next() : null;
221+
}
222+
223+
@Override
224+
protected void doClose() throws Exception {
225+
this.cursor.close();
226+
}
227+
228+
private Sort convertToSort(Map<String, Sort.Direction> sorts) {
229+
List<Sort.Order> sortValues = new ArrayList<>(sorts.size());
230+
231+
for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
232+
sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
233+
}
234+
235+
return Sort.by(sortValues);
236+
}
237+
238+
private Query createQuery() {
239+
String populatedQuery = replacePlaceholders(queryString, parameterValues);
240+
241+
Query mongoQuery;
242+
if (StringUtils.hasText(fields)) {
243+
mongoQuery = new BasicQuery(populatedQuery, fields);
244+
} else {
245+
mongoQuery = new BasicQuery(populatedQuery);
246+
}
247+
248+
if (sort != null) {
249+
mongoQuery.with(sort);
250+
}
251+
if (StringUtils.hasText(hint)) {
252+
mongoQuery.withHint(hint);
253+
}
254+
if (batchSize != null) {
255+
mongoQuery.cursorBatchSize(batchSize);
256+
}
257+
if (limit != null) {
258+
mongoQuery.limit(limit);
259+
}
260+
if (maxTimeMs != null) {
261+
mongoQuery.maxTime(Duration.of(maxTimeMs, ChronoUnit.MILLIS));
262+
} else {
263+
mongoQuery.noCursorTimeout();
264+
}
265+
266+
return mongoQuery;
267+
}
268+
269+
private String replacePlaceholders(String input, List<Object> values) {
270+
ParameterBindingJsonReader reader = new ParameterBindingJsonReader(input, values.toArray());
271+
DecoderContext decoderContext = DecoderContext.builder().build();
272+
Document document = new ParameterBindingDocumentCodec().decode(reader, decoderContext);
273+
return document.toJson();
274+
}
275+
276+
private CloseableIterator<? extends T> streamToIterator(Stream<? extends T> stream) {
277+
return new CloseableIterator<>() {
278+
final private Iterator<? extends T> delegate = stream.iterator();
279+
280+
@Override
281+
public boolean hasNext() {
282+
return delegate.hasNext();
283+
}
284+
285+
@Override
286+
public T next() {
287+
return delegate.next();
288+
}
289+
290+
@Override
291+
public void close() {
292+
stream.close();
293+
}
294+
};
295+
}
296+
297+
}

0 commit comments

Comments
 (0)