From b51ad98be70d387463d2d08734c53b9aa553e02e Mon Sep 17 00:00:00 2001 From: LEE Juchan Date: Fri, 10 Mar 2023 17:36:54 +0900 Subject: [PATCH] Add cursor-based ItemReader for MongoDB --- .../item/data/MongoCursorItemReader.java | 297 +++++++++++++++++ .../builder/MongoCursorItemReaderBuilder.java | 304 +++++++++++++++++ .../item/data/MongoCursorItemReaderTest.java | 310 ++++++++++++++++++ 3 files changed, 911 insertions(+) create mode 100644 spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoCursorItemReader.java create mode 100644 spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoCursorItemReaderBuilder.java create mode 100644 spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoCursorItemReaderTest.java diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoCursorItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoCursorItemReader.java new file mode 100644 index 0000000000..4c3c8b0dfe --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoCursorItemReader.java @@ -0,0 +1,297 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.data; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import org.bson.Document; +import org.bson.codecs.DecoderContext; + +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.query.BasicQuery; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.util.json.ParameterBindingDocumentCodec; +import org.springframework.data.mongodb.util.json.ParameterBindingJsonReader; +import org.springframework.data.util.CloseableIterator; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; + +/** + * @author LEE Juchan + * @since 5.0 + */ +public class MongoCursorItemReader extends AbstractItemCountingItemStreamItemReader implements InitializingBean { + + private MongoOperations template; + + private Class targetType; + + private String collection; + + private Query query; + + private String queryString; + + private List parameterValues = new ArrayList<>(); + + private String fields; + + private Sort sort; + + private String hint; + + private Integer batchSize; + + private Integer limit; + + private Integer maxTimeMs; + + private CloseableIterator cursor; + + public MongoCursorItemReader() { + super(); + setName(ClassUtils.getShortName(MongoCursorItemReader.class)); + } + + /** + * Used to perform operations against the MongoDB instance. Also handles the mapping + * of documents to objects. + * @param template the MongoOperations instance to use + * @see MongoOperations + */ + public void setTemplate(MongoOperations template) { + this.template = template; + } + + /** + * The targetType of object to be returned for each {@link #read()} call. + * @param targetType the targetType of object to return + */ + public void setTargetType(Class targetType) { + this.targetType = targetType; + } + + /** + * @param collection Mongo collection to be queried. + */ + public void setCollection(String collection) { + this.collection = collection; + } + + /** + * A Mongo Query to be used. + * @param query Mongo Query to be used. + */ + public void setQuery(Query query) { + this.query = query; + } + + /** + * A JSON formatted MongoDB query. Parameterization of the provided query is allowed + * via ?<index> placeholders where the <index> indicates the index of the + * parameterValue to substitute. + * @param queryString JSON formatted Mongo query + */ + public void setQuery(String queryString) { + this.queryString = queryString; + } + + /** + * {@link List} of values to be substituted in for each of the parameters in the + * query. + * @param parameterValues values + */ + public void setParameterValues(List parameterValues) { + Assert.notNull(parameterValues, "Parameter values must not be null"); + this.parameterValues = parameterValues; + } + + /** + * JSON defining the fields to be returned from the matching documents by MongoDB. + * @param fields JSON string that identifies the fields to sort by. + */ + public void setFields(String fields) { + this.fields = fields; + } + + /** + * {@link Map} of property + * names/{@link org.springframework.data.domain.Sort.Direction} values to sort the + * input by. + * @param sorts map of properties and direction to sort each. + */ + public void setSort(Map sorts) { + Assert.notNull(sorts, "Sorts must not be null"); + this.sort = convertToSort(sorts); + } + + /** + * JSON String telling MongoDB what index to use. + * @param hint string indicating what index to use. + */ + public void setHint(String hint) { + this.hint = hint; + } + + /** + * The size of batches to use when iterating over results. + * @param batchSize size the batch size to apply to the cursor + */ + public void setBatchSize(Integer batchSize) { + this.batchSize = batchSize; + } + + /** + * The query limit + * @param limit The limit + */ + public void setLimit(Integer limit) { + this.limit = limit; + } + + /** + * The maximum execution time for the aggregation command + * @param maxTimeMs The max time + */ + public void setMaxTimeMs(Integer maxTimeMs) { + this.maxTimeMs = maxTimeMs; + } + + /** + * Checks mandatory properties + * + * @see InitializingBean#afterPropertiesSet() + */ + @Override + public void afterPropertiesSet() { + Assert.state(template != null, "An implementation of MongoOperations is required."); + Assert.state(targetType != null, "A targetType to convert the input into is required."); + Assert.state(queryString != null || query != null, "A query is required."); + + if (queryString != null) { + Assert.state(sort != null, "A sort is required."); + } + } + + @Override + protected void doOpen() throws Exception { + Query mongoQuery; + if (queryString != null) { + mongoQuery = createQuery(); + } else { + mongoQuery = query; + } + + Stream stream; + if (StringUtils.hasText(collection)) { + stream = template.stream(mongoQuery, targetType, collection); + } else { + stream = template.stream(mongoQuery, targetType); + } + + this.cursor = streamToIterator(stream); + } + + @Override + protected T doRead() throws Exception { + return cursor.hasNext() ? cursor.next() : null; + } + + @Override + protected void doClose() throws Exception { + this.cursor.close(); + } + + private Sort convertToSort(Map sorts) { + List sortValues = new ArrayList<>(sorts.size()); + + for (Map.Entry curSort : sorts.entrySet()) { + sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey())); + } + + return Sort.by(sortValues); + } + + private Query createQuery() { + String populatedQuery = replacePlaceholders(queryString, parameterValues); + + Query mongoQuery; + if (StringUtils.hasText(fields)) { + mongoQuery = new BasicQuery(populatedQuery, fields); + } else { + mongoQuery = new BasicQuery(populatedQuery); + } + + if (sort != null) { + mongoQuery.with(sort); + } + if (StringUtils.hasText(hint)) { + mongoQuery.withHint(hint); + } + if (batchSize != null) { + mongoQuery.cursorBatchSize(batchSize); + } + if (limit != null) { + mongoQuery.limit(limit); + } + if (maxTimeMs != null) { + mongoQuery.maxTime(Duration.of(maxTimeMs, ChronoUnit.MILLIS)); + } else { + mongoQuery.noCursorTimeout(); + } + + return mongoQuery; + } + + private String replacePlaceholders(String input, List values) { + ParameterBindingJsonReader reader = new ParameterBindingJsonReader(input, values.toArray()); + DecoderContext decoderContext = DecoderContext.builder().build(); + Document document = new ParameterBindingDocumentCodec().decode(reader, decoderContext); + return document.toJson(); + } + + private CloseableIterator streamToIterator(Stream stream) { + return new CloseableIterator<>() { + final private Iterator delegate = stream.iterator(); + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public T next() { + return delegate.next(); + } + + @Override + public void close() { + stream.close(); + } + }; + } + +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoCursorItemReaderBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoCursorItemReaderBuilder.java new file mode 100644 index 0000000000..6b39adf000 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/builder/MongoCursorItemReaderBuilder.java @@ -0,0 +1,304 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.item.data.builder; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.data.MongoCursorItemReader; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author LEE Juchan + * @since 5.0 + * @see MongoCursorItemReader + */ +public class MongoCursorItemReaderBuilder { + + private boolean saveState = true; + + private String name; + + private int maxItemCount = Integer.MAX_VALUE; + + private int currentItemCount; + + private MongoOperations template; + + private Class targetType; + + private String collection; + + private Query query; + + private String jsonQuery; + + private List parameterValues = new ArrayList<>(); + + private String fields; + + private Map sorts; + + private String hint; + + private Integer batchSize; + + private Integer limit; + + private Integer maxTimeMs; + + /** + * Configure if the state of the + * {@link org.springframework.batch.item.ItemStreamSupport} should be persisted within + * the {@link org.springframework.batch.item.ExecutionContext} for restart purposes. + * @param saveState defaults to true + * @return The current instance of the builder. + */ + public MongoCursorItemReaderBuilder saveState(boolean saveState) { + this.saveState = saveState; + + return this; + } + + /** + * The name used to calculate the key within the + * {@link org.springframework.batch.item.ExecutionContext}. Required if + * {@link #saveState(boolean)} is set to true. + * @param name name of the reader instance + * @return The current instance of the builder. + * @see org.springframework.batch.item.ItemStreamSupport#setName(String) + */ + public MongoCursorItemReaderBuilder name(String name) { + this.name = name; + + return this; + } + + /** + * Configure the max number of items to be read. + * @param maxItemCount the max items to be read + * @return The current instance of the builder. + * @see org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader#setMaxItemCount(int) + */ + public MongoCursorItemReaderBuilder maxItemCount(int maxItemCount) { + this.maxItemCount = maxItemCount; + + return this; + } + + /** + * Index for the current item. Used on restarts to indicate where to start from. + * @param currentItemCount current index + * @return this instance for method chaining + * @see org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader#setCurrentItemCount(int) + */ + public MongoCursorItemReaderBuilder currentItemCount(int currentItemCount) { + this.currentItemCount = currentItemCount; + + return this; + } + + /** + * Used to perform operations against the MongoDB instance. Also handles the mapping + * of documents to objects. + * @param template the MongoOperations instance to use + * @see MongoOperations + * @return The current instance of the builder + * @see MongoCursorItemReader#setTemplate(MongoOperations) + */ + public MongoCursorItemReaderBuilder template(MongoOperations template) { + this.template = template; + + return this; + } + + /** + * The targetType of object to be returned for each {@link MongoCursorItemReader#read()} call. + * @param targetType the targetType of object to return + * @return The current instance of the builder + * @see MongoCursorItemReader#setTargetType(Class) + */ + public MongoCursorItemReaderBuilder targetType(Class targetType) { + this.targetType = targetType; + + return this; + } + + /** + * Establish an optional collection that can be queried. + * @param collection Mongo collection to be queried. + * @return The current instance of the builder + * @see MongoCursorItemReader#setCollection(String) + */ + public MongoCursorItemReaderBuilder collection(String collection) { + this.collection = collection; + + return this; + } + + /** + * Provide a Spring Data Mongo {@link Query}. This will take precedence over a JSON + * configured query. + * @param query Query to execute + * @return this instance for method chaining + * @see MongoCursorItemReader#setQuery(Query) + */ + public MongoCursorItemReaderBuilder query(Query query) { + this.query = query; + + return this; + } + + /** + * A JSON formatted MongoDB jsonQuery. Parameterization of the provided jsonQuery is + * allowed via ?<index> placeholders where the <index> indicates the index + * of the parameterValue to substitute. + * @param query JSON formatted Mongo jsonQuery + * @return The current instance of the builder + * @see MongoCursorItemReader#setQuery(String) + */ + public MongoCursorItemReaderBuilder jsonQuery(String query) { + this.jsonQuery = query; + + return this; + } + + /** + * Values to be substituted in for each of the parameters in the query. + * @param parameterValues values + * @return The current instance of the builder + * @see MongoCursorItemReader#setParameterValues(List) + */ + public MongoCursorItemReaderBuilder parameterValues(List parameterValues) { + this.parameterValues = parameterValues; + + return this; + } + + /** + * JSON defining the fields to be returned from the matching documents by MongoDB. + * @param fields JSON string that identifies the fields to sort by. + * @return The current instance of the builder + * @see MongoCursorItemReader#setFields(String) + */ + public MongoCursorItemReaderBuilder fields(String fields) { + this.fields = fields; + + return this; + } + + /** + * {@link Map} of property + * names/{@link org.springframework.data.domain.Sort.Direction} values to sort the + * input by. + * @param sorts map of properties and direction to sort each. + * @return The current instance of the builder + * @see MongoCursorItemReader#setSort(Map) + */ + public MongoCursorItemReaderBuilder sorts(Map sorts) { + this.sorts = sorts; + + return this; + } + + /** + * JSON String telling MongoDB what index to use. + * @param hint string indicating what index to use. + * @return The current instance of the builder + * @see MongoCursorItemReader#setHint(String) + */ + public MongoCursorItemReaderBuilder hint(String hint) { + this.hint = hint; + + return this; + } + + /** + * The size of batches to use when iterating over results. + * @param batchSize string indicating what index to use. + * @return The current instance of the builder + * @see MongoCursorItemReader#setHint(String) + */ + public MongoCursorItemReaderBuilder batchSize(Integer batchSize) { + this.batchSize = batchSize; + + return this; + } + + /** + * The query limit + * @param limit The limit + * @return The current instance of the builder + * @see MongoCursorItemReader#setLimit(Integer) + */ + public MongoCursorItemReaderBuilder limit(Integer limit) { + this.limit = limit; + + return this; + } + + /** + * The maximum execution time for the aggregation command + * @param maxTimeMs The max time + * @return The current instance of the builder + * @see MongoCursorItemReader#setMaxTimeMs(Integer) + */ + public MongoCursorItemReaderBuilder maxTimeMs(Integer maxTimeMs) { + this.maxTimeMs = maxTimeMs; + + return this; + } + + public MongoCursorItemReader build() { + Assert.notNull(this.template, "template is required."); + if (this.saveState) { + Assert.hasText(this.name, "A name is required when saveState is set to true"); + } + Assert.notNull(this.targetType, "targetType is required."); + Assert.state(StringUtils.hasText(this.jsonQuery) || this.query != null, "A query is required"); + + if (StringUtils.hasText(this.jsonQuery) || this.query != null) { + Assert.notNull(this.sorts, "sorts map is required."); + } + + MongoCursorItemReader reader = new MongoCursorItemReader<>(); + reader.setSaveState(this.saveState); + reader.setName(this.name); + reader.setCurrentItemCount(this.currentItemCount); + reader.setMaxItemCount(this.maxItemCount); + + reader.setTemplate(this.template); + reader.setTargetType(this.targetType); + reader.setCollection(this.collection); + reader.setQuery(this.query); + reader.setQuery(this.jsonQuery); + reader.setParameterValues(this.parameterValues); + reader.setFields(this.fields); + reader.setSort(this.sorts); + reader.setHint(this.hint); + reader.setBatchSize(this.batchSize); + reader.setLimit(this.limit); + reader.setMaxTimeMs(this.maxTimeMs); + + return reader; + } +} diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoCursorItemReaderTest.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoCursorItemReaderTest.java new file mode 100644 index 0000000000..107d373e8f --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoCursorItemReaderTest.java @@ -0,0 +1,310 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.data; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Query; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +/** + * @author LEE Juchan + * @since 5.0 + */ +@ExtendWith(MockitoExtension.class) +class MongoCursorItemReaderTest { + + private MongoCursorItemReader reader; + + @Mock + private MongoTemplate template; + + private Map sortOptions; + + @BeforeEach + void setUp() { + reader = new MongoCursorItemReader<>(); + + sortOptions = new HashMap<>(); + sortOptions.put("name", Sort.Direction.DESC); + + reader.setTemplate(template); + reader.setTargetType(String.class); + reader.setQuery("{ }"); + reader.setSort(sortOptions); + reader.afterPropertiesSet(); + } + + @Test + void testAfterPropertiesSetForQueryString() { + reader = new MongoCursorItemReader<>(); + Exception exception = assertThrows(IllegalStateException.class, reader::afterPropertiesSet); + assertEquals("An implementation of MongoOperations is required.", exception.getMessage()); + + reader.setTemplate(template); + + exception = assertThrows(IllegalStateException.class, reader::afterPropertiesSet); + assertEquals("A type to convert the input into is required.", exception.getMessage()); + + reader.setTargetType(String.class); + + exception = assertThrows(IllegalStateException.class, reader::afterPropertiesSet); + assertEquals("A query is required.", exception.getMessage()); + + reader.setQuery(""); + + exception = assertThrows(IllegalStateException.class, reader::afterPropertiesSet); + assertEquals("A sort is required.", exception.getMessage()); + + reader.setSort(sortOptions); + reader.afterPropertiesSet(); + } + + @Test + void testAfterPropertiesSetForQueryObject() { + reader = new MongoCursorItemReader<>(); + + reader.setTemplate(template); + reader.setTargetType(String.class); + + Query query = new Query().with(Sort.by(new Sort.Order(Sort.Direction.ASC, "_id"))); + reader.setQuery(query); + + reader.afterPropertiesSet(); + } + + @Test + void testBasicQuery() throws Exception { + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of("hello world")); + + reader.doOpen(); + assertEquals(reader.doRead(), "hello world"); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + } + + @Test + void testQueryWithFields() throws Exception { + reader.setFields("{name : 1, age : 1, _id: 0}"); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals(1, query.getFieldsObject().get("name")); + assertEquals(1, query.getFieldsObject().get("age")); + assertEquals(0, query.getFieldsObject().get("_id")); + } + + @Test + void testQueryWithHint() throws Exception { + reader.setHint("{ $natural : 1}"); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals("{ $natural : 1}", query.getHint()); + } + + @Test + void testQueryWithParameters() throws Exception { + reader.setParameterValues(Collections.singletonList("foo")); + + reader.setQuery("{ name : ?0 }"); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{\"name\": \"foo\"}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + } + + @Test + void testQueryWithBatchSize() throws Exception { + reader.setBatchSize(50); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals(50, query.getMeta().getCursorBatchSize()); + } + + @Test + void testQueryWithLimit() throws Exception { + reader.setLimit(200); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals(200, query.getLimit()); + } + + @Test + void testQueryWithMaxTime() throws Exception { + reader.setMaxTimeMs(3000); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals(3000, query.getMeta().getMaxTimeMsec()); + } + + @Test + void testQueryWithCollection() throws Exception { + reader.setParameterValues(Collections.singletonList("foo")); + + reader.setQuery("{ name : ?0 }"); + reader.setCollection("collection"); + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + ArgumentCaptor collectionContainer = ArgumentCaptor.forClass(String.class); + + when(template.stream(queryContainer.capture(), eq(String.class), collectionContainer.capture())) + .thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query query = queryContainer.getValue(); + assertEquals("{\"name\": \"foo\"}", query.getQueryObject().toJson()); + assertEquals("{\"name\": -1}", query.getSortObject().toJson()); + assertEquals("collection", collectionContainer.getValue()); + } + + @Test + void testQueryObject() throws Exception { + reader = new MongoCursorItemReader<>(); + reader.setTemplate(template); + + Query query = new Query().with(Sort.by(new Sort.Order(Sort.Direction.ASC, "_id"))); + reader.setQuery(query); + reader.setTargetType(String.class); + + reader.afterPropertiesSet(); + + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + when(template.stream(queryContainer.capture(), eq(String.class))).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query actualQuery = queryContainer.getValue(); + assertEquals("{}", actualQuery.getQueryObject().toJson()); + assertEquals("{\"_id\": 1}", actualQuery.getSortObject().toJson()); + } + + @Test + void testQueryObjectWithCollection() throws Exception { + reader = new MongoCursorItemReader<>(); + reader.setTemplate(template); + + Query query = new Query().with(Sort.by(new Sort.Order(Sort.Direction.ASC, "_id"))); + reader.setQuery(query); + reader.setTargetType(String.class); + reader.setCollection("collection"); + + reader.afterPropertiesSet(); + + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + ArgumentCaptor stringContainer = ArgumentCaptor.forClass(String.class); + when(template.stream(queryContainer.capture(), eq(String.class), stringContainer.capture())).thenReturn(Stream.of()); + + reader.doOpen(); + assertNull(reader.doRead()); + + Query actualQuery = queryContainer.getValue(); + assertEquals("{}", actualQuery.getQueryObject().toJson()); + assertEquals("{\"_id\": 1}", actualQuery.getSortObject().toJson()); + assertEquals("collection", stringContainer.getValue()); + } + + @Test + void testSortThrowsExceptionWhenInvokedWithNull() { + // given + reader = new MongoCursorItemReader<>(); + + // when + then + assertThatIllegalArgumentException().isThrownBy(() -> reader.setSort(null)) + .withMessage("Sorts must not be null"); + } + + @Test + void testCursorRead() throws Exception { + ArgumentCaptor queryContainer = ArgumentCaptor.forClass(Query.class); + when(template.stream(queryContainer.capture(), eq(String.class))) + .thenReturn(Stream.of("first", "second", "third")); + + reader.doOpen(); + + assertEquals("first", reader.doRead()); + assertEquals("second", reader.doRead()); + assertEquals("third", reader.doRead()); + assertNull(reader.doRead()); + } + +}