diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 942721a27ad..792c10b4bb2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -60,6 +60,7 @@ import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument; import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult; import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException; +import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults; import static java.util.Collections.emptyList; class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor { @@ -117,6 +118,7 @@ public void next(final SingleResultCallback> callback) { } if (serverCursorIsNull || !batchResults.isEmpty()) { + commandCursorResult = withEmptyResults(commandCursorResult); funcCallback.onResult(batchResults, null); } else { getMore(localServerCursor, funcCallback); @@ -206,6 +208,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se resourceManager.setServerCursor(nextServerCursor); List nextBatch = commandCursorResult.getResults(); if (nextServerCursor == null || !nextBatch.isEmpty()) { + commandCursorResult = withEmptyResults(commandCursorResult); callback.onResult(nextBatch, null); return; } diff --git a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java index 1463798ef64..1f280e040fd 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java @@ -60,7 +60,8 @@ public interface BatchCursor extends Iterator>, Closeable { boolean hasNext(); /** - * Returns the next batch of results. A tailable cursor will block until another batch exists. + * Returns the next batch of results as a mutable list. Modifications to the list will not affect the cursor state. + * A tailable cursor will block until another batch exists. * * @return the next batch of results * @throws java.util.NoSuchElementException if no next batch exists @@ -89,7 +90,8 @@ public interface BatchCursor extends Iterator>, Closeable { int getBatchSize(); /** - * A special {@code next()} case that returns the next batch if available or null. + * A special {@code next()} case that returns the next batch as a mutable list if available or null. + * Modifications to the list will not affect the cursor state. * *

Tailable cursors are an example where this is useful. A call to {@code tryNext()} may return null, but in the future calling * {@code tryNext()} would return a new batch if a document had been added to the capped collection.

diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index d201976e5ed..24ecc99b9f1 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -144,6 +144,7 @@ private List doNext() { List retVal = nextBatch; nextBatch = null; + commandCursorResult = CommandCursorResult.withEmptyResults(commandCursorResult); return retVal; } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java index 7bfbfb33cbe..813d8c145cd 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java @@ -23,6 +23,7 @@ import org.bson.BsonDocument; import org.bson.BsonTimestamp; +import java.util.Collections; import java.util.List; import static com.mongodb.assertions.Assertions.isTrue; @@ -60,6 +61,31 @@ public CommandCursorResult( this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null); } + private CommandCursorResult( + final ServerAddress serverAddress, + final List results, + final MongoNamespace namespace, + final long cursorId, + @Nullable final BsonTimestamp operationTime, + @Nullable final BsonDocument postBatchResumeToken) { + this.serverAddress = serverAddress; + this.results = results; + this.namespace = namespace; + this.cursorId = cursorId; + this.operationTime = operationTime; + this.postBatchResumeToken = postBatchResumeToken; + } + + public static CommandCursorResult withEmptyResults(final CommandCursorResult commandCursorResult) { + return new CommandCursorResult<>( + commandCursorResult.getServerAddress(), + Collections.emptyList(), + commandCursorResult.getNamespace(), + commandCursorResult.getCursorId(), + commandCursorResult.getOperationTime(), + commandCursorResult.getPostBatchResumeToken()); + } + /** * Gets the namespace. * diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoBatchCursorAdapter.java b/driver-sync/src/main/com/mongodb/client/internal/MongoBatchCursorAdapter.java index 527e5bd75e6..3de806671be 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoBatchCursorAdapter.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoBatchCursorAdapter.java @@ -97,6 +97,7 @@ public ServerAddress getServerAddress() { private T getNextInBatch() { T nextInBatch = curBatch.get(curPos); + curBatch.set(curPos, null); if (curPos < curBatch.size() - 1) { curPos++; } else { diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoChangeStreamCursorImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoChangeStreamCursorImpl.java index 895d34ea12b..fa6ea549643 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoChangeStreamCursorImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoChangeStreamCursorImpl.java @@ -112,6 +112,7 @@ public ServerAddress getServerAddress() { private T getNextInBatch() { RawBsonDocument nextInBatch = curBatch.get(curPos); + curBatch.set(curPos, null); resumeToken = nextInBatch.getDocument("_id"); if (curPos < curBatch.size() - 1) { curPos++; diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSFindIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSFindIterableSpecification.groovy index 632e59a16d0..40cd03bc7e9 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSFindIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSFindIterableSpecification.groovy @@ -130,12 +130,13 @@ class GridFSFindIterableSpecification extends Specification { , null), ] def cursor = { + def batchToReturn = cannedResults.collect(); Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy index 733ee4c57df..467e9614424 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy @@ -395,12 +395,13 @@ class AggregateIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { @@ -591,12 +592,13 @@ class AggregateIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/ChangeStreamIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/ChangeStreamIterableSpecification.groovy index b66373b221f..fdf31a76b56 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/ChangeStreamIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/ChangeStreamIterableSpecification.groovy @@ -160,8 +160,8 @@ class ChangeStreamIterableSpecification extends Specification { def cannedResults = ['{_id: {_data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect { RawBsonDocument.parse(it) } - def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults), - cursor(cannedResults)]) + def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()), + cursor(cannedResults.collect()), cursor(cannedResults.collect())]) def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [], Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS) @@ -208,8 +208,9 @@ class ChangeStreamIterableSpecification extends Specification { given: def count = 0 def cannedResults = ['{_id: { _data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect { RawBsonDocument.parse(it) } - def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults), - cursor(cannedResults)]) + def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()), + cursor(cannedResults.collect()), cursor(cannedResults.collect()), + cursor(cannedResults.collect())]) def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [], Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS).withDocumentClass(RawBsonDocument) diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/DistinctIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/DistinctIterableSpecification.groovy index 3baac05653a..82c4bf9a037 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/DistinctIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/DistinctIterableSpecification.groovy @@ -128,12 +128,13 @@ class DistinctIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/FindIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/FindIterableSpecification.groovy index e2f7cae2d62..78ab9a3601b 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/FindIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/FindIterableSpecification.groovy @@ -221,11 +221,12 @@ class FindIterableSpecification extends Specification { def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { Stub(BatchCursor) { + def batchToReturn = cannedResults.collect() def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/ListCollectionsIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/ListCollectionsIterableSpecification.groovy index 559935c05ee..12556430167 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/ListCollectionsIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/ListCollectionsIterableSpecification.groovy @@ -127,12 +127,13 @@ class ListCollectionsIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/ListDatabasesIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/ListDatabasesIterableSpecification.groovy index 8df91709486..627cc13ef3c 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/ListDatabasesIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/ListDatabasesIterableSpecification.groovy @@ -83,12 +83,13 @@ class ListDatabasesIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/ListIndexesIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/ListIndexesIterableSpecification.groovy index d11c59d46d2..f7bad5189dd 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/ListIndexesIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/ListIndexesIterableSpecification.groovy @@ -105,11 +105,12 @@ class ListIndexesIterableSpecification extends Specification { def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { Stub(BatchCursor) { + def batchToReturn = cannedResults.collect() def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MapReduceIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MapReduceIterableSpecification.groovy index b6cb01d31cb..b2b7faa6b2a 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/MapReduceIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MapReduceIterableSpecification.groovy @@ -255,12 +255,13 @@ class MapReduceIterableSpecification extends Specification { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)] def cursor = { + def batchToReturn = cannedResults.collect() Stub(BatchCursor) { def count = 0 def results def getResult = { count++ - results = count == 1 ? cannedResults : null + results = count == 1 ? batchToReturn : null results } next() >> { diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoBatchCursorAdapterSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoBatchCursorAdapterSpecification.groovy index a9ac0b0dce7..2b91f584027 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoBatchCursorAdapterSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoBatchCursorAdapterSpecification.groovy @@ -80,42 +80,46 @@ class MongoBatchCursorAdapterSpecification extends Specification { def 'should get next from batch cursor'() { given: - def firstBatch = [new Document('x', 1), new Document('x', 1)] - def secondBatch = [new Document('x', 2)] + def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [new Document('x', 2)] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() def batchCursor = Stub(BatchCursor) batchCursor.hasNext() >>> [true, true, true, true, false] - batchCursor.next() >>> [firstBatch, secondBatch] + batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor] def cursor = new MongoBatchCursorAdapter(batchCursor) expect: cursor.hasNext() - cursor.next() == firstBatch[0] + cursor.next() == expectedFirstBatch[0] cursor.hasNext() - cursor.next() == firstBatch[1] + cursor.next() == expectedFirstBatch[1] cursor.hasNext() - cursor.next() == secondBatch[0] + cursor.next() == expectedSecondBatch[0] !cursor.hasNext() } def 'should try next from batch cursor'() { given: - def firstBatch = [new Document('x', 1), new Document('x', 1)] - def secondBatch = [new Document('x', 2)] + def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [new Document('x', 2)] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() def batchCursor = Stub(BatchCursor) - batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null] + batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null] def cursor = new MongoBatchCursorAdapter(batchCursor) expect: - cursor.tryNext() == firstBatch[0] - cursor.tryNext() == firstBatch[1] + cursor.tryNext() == expectedFirstBatch[0] + cursor.tryNext() == expectedFirstBatch[1] cursor.tryNext() == null - cursor.tryNext() == secondBatch[0] + cursor.tryNext() == expectedSecondBatch[0] cursor.tryNext() == null } diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoChangeStreamCursorSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoChangeStreamCursorSpecification.groovy index 8354899ecfa..045a8c6daf5 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoChangeStreamCursorSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoChangeStreamCursorSpecification.groovy @@ -91,63 +91,70 @@ class MongoChangeStreamCursorSpecification extends Specification { def 'should get next from batch cursor'() { given: - def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), + + def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] - def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() def batchCursor = Stub(AggregateResponseBatchCursor) def codec = new RawBsonDocumentCodec() def resumeToken = Mock(BsonDocument) batchCursor.hasNext() >>> [true, true, true, true, false] - batchCursor.next() >>> [firstBatch, secondBatch] + batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor] def cursor = new MongoChangeStreamCursorImpl(batchCursor, codec, resumeToken) expect: cursor.hasNext() - cursor.next() == firstBatch[0] + cursor.next() == expectedFirstBatch[0] cursor.hasNext() - cursor.next() == firstBatch[1] + cursor.next() == expectedFirstBatch[1] cursor.hasNext() - cursor.next() == secondBatch[0] + cursor.next() == expectedSecondBatch[0] !cursor.hasNext() } def 'should try next from batch cursor'() { given: - def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), - RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] - def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), + RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() def batchCursor = Stub(AggregateResponseBatchCursor) def codec = new RawBsonDocumentCodec() def resumeToken = Mock(BsonDocument) - batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null] + batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null] def cursor = new MongoChangeStreamCursorImpl(batchCursor, codec, resumeToken) expect: - cursor.tryNext() == firstBatch[0] - cursor.tryNext() == firstBatch[1] + cursor.tryNext() == expectedFirstBatch[0] + cursor.tryNext() == expectedFirstBatch[1] cursor.tryNext() == null - cursor.tryNext() == secondBatch[0] + cursor.tryNext() == expectedSecondBatch[0] cursor.tryNext() == null } def 'should get cached resume token after next'() { given: - def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), - RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] - List secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), + RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() def batchCursor = Stub(AggregateResponseBatchCursor) def codec = new RawBsonDocumentCodec() def resumeToken = new BsonDocument('_data', new BsonInt32(1)) batchCursor.hasNext() >>> [true, true, true, false] - batchCursor.next() >>> [firstBatch, secondBatch] + batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor] batchCursor.getPostBatchResumeToken() >>> [new BsonDocument('_data', new BsonInt32(2)), new BsonDocument('_data', new BsonInt32(2)), new BsonDocument('_data', new BsonInt32(3)), @@ -157,26 +164,29 @@ class MongoChangeStreamCursorSpecification extends Specification { expect: cursor.getResumeToken() == resumeToken - cursor.next() == firstBatch.head() + cursor.next() == expectedFirstBatch.head() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(1)) - cursor.next() == firstBatch.last() + cursor.next() == expectedFirstBatch.last() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2)) - cursor.next() == secondBatch.head() + cursor.next() == expectedSecondBatch.head() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3)) } def 'should get cached resume token after tryNext'() { given: - def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), - RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] - def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'), + RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')] + def expectedFirstBatch = firstBatchFromBatchCursor.collect() + def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')] + def expectedSecondBatch = secondBatchFromBatchCursor.collect() + def batchCursor = Stub(AggregateResponseBatchCursor) def codec = new RawBsonDocumentCodec() def resumeToken = new BsonDocument('_data', new BsonInt32(1)) batchCursor.hasNext() >>> [true, true, true, false] - batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null] + batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null] batchCursor.getPostBatchResumeToken() >>> [new BsonDocument('_data', new BsonInt32(2)), new BsonDocument('_data', new BsonInt32(2)), new BsonDocument('_data', new BsonInt32(2)), @@ -189,13 +199,13 @@ class MongoChangeStreamCursorSpecification extends Specification { expect: cursor.getResumeToken() == resumeToken - cursor.tryNext() == firstBatch.head() + cursor.tryNext() == expectedFirstBatch.head() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(1)) - cursor.tryNext() == firstBatch.last() + cursor.tryNext() == expectedFirstBatch.last() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2)) cursor.tryNext() == null cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2)) - cursor.tryNext() == secondBatch.head() + cursor.tryNext() == expectedSecondBatch.head() cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3)) cursor.tryNext() == null cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3))