Skip to content

Commit 441d96f

Browse files
vbabaninrozza
andauthored
Clear CommandCursorResult.results after next()/tryNext(). (#1780)
JAVA-5940 --------- Co-authored-by: Ross Lawley <ross@mongodb.com>
1 parent bcf15c0 commit 441d96f

17 files changed

+112
-54
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
6161
import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
6262
import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
63+
import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults;
6364
import static java.util.Collections.emptyList;
6465

6566
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
@@ -117,6 +118,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
117118
}
118119

119120
if (serverCursorIsNull || !batchResults.isEmpty()) {
121+
commandCursorResult = withEmptyResults(commandCursorResult);
120122
funcCallback.onResult(batchResults, null);
121123
} else {
122124
getMore(localServerCursor, funcCallback);
@@ -206,6 +208,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
206208
resourceManager.setServerCursor(nextServerCursor);
207209
List<T> nextBatch = commandCursorResult.getResults();
208210
if (nextServerCursor == null || !nextBatch.isEmpty()) {
211+
commandCursorResult = withEmptyResults(commandCursorResult);
209212
callback.onResult(nextBatch, null);
210213
return;
211214
}

driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
6060
boolean hasNext();
6161

6262
/**
63-
* Returns the next batch of results. A tailable cursor will block until another batch exists.
63+
* Returns the next batch of results as a mutable list. Modifications to the list will not affect the cursor state.
64+
* A tailable cursor will block until another batch exists.
6465
*
6566
* @return the next batch of results
6667
* @throws java.util.NoSuchElementException if no next batch exists
@@ -89,7 +90,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
8990
int getBatchSize();
9091

9192
/**
92-
* A special {@code next()} case that returns the next batch if available or null.
93+
* A special {@code next()} case that returns the next batch as a mutable list if available or null.
94+
* Modifications to the list will not affect the cursor state.
9395
*
9496
* <p>Tailable cursors are an example where this is useful. A call to {@code tryNext()} may return null, but in the future calling
9597
* {@code tryNext()} would return a new batch if a document had been added to the capped collection.</p>

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ private List<T> doNext() {
144144

145145
List<T> retVal = nextBatch;
146146
nextBatch = null;
147+
commandCursorResult = CommandCursorResult.withEmptyResults(commandCursorResult);
147148
return retVal;
148149
}
149150

driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.bson.BsonDocument;
2424
import org.bson.BsonTimestamp;
2525

26+
import java.util.Collections;
2627
import java.util.List;
2728

2829
import static com.mongodb.assertions.Assertions.isTrue;
@@ -60,6 +61,31 @@ public CommandCursorResult(
6061
this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null);
6162
}
6263

64+
private CommandCursorResult(
65+
final ServerAddress serverAddress,
66+
final List<T> results,
67+
final MongoNamespace namespace,
68+
final long cursorId,
69+
@Nullable final BsonTimestamp operationTime,
70+
@Nullable final BsonDocument postBatchResumeToken) {
71+
this.serverAddress = serverAddress;
72+
this.results = results;
73+
this.namespace = namespace;
74+
this.cursorId = cursorId;
75+
this.operationTime = operationTime;
76+
this.postBatchResumeToken = postBatchResumeToken;
77+
}
78+
79+
public static <T> CommandCursorResult<T> withEmptyResults(final CommandCursorResult<T> commandCursorResult) {
80+
return new CommandCursorResult<>(
81+
commandCursorResult.getServerAddress(),
82+
Collections.emptyList(),
83+
commandCursorResult.getNamespace(),
84+
commandCursorResult.getCursorId(),
85+
commandCursorResult.getOperationTime(),
86+
commandCursorResult.getPostBatchResumeToken());
87+
}
88+
6389
/**
6490
* Gets the namespace.
6591
*

driver-sync/src/main/com/mongodb/client/internal/MongoBatchCursorAdapter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public ServerAddress getServerAddress() {
9797

9898
private T getNextInBatch() {
9999
T nextInBatch = curBatch.get(curPos);
100+
curBatch.set(curPos, null);
100101
if (curPos < curBatch.size() - 1) {
101102
curPos++;
102103
} else {

driver-sync/src/main/com/mongodb/client/internal/MongoChangeStreamCursorImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public ServerAddress getServerAddress() {
112112

113113
private T getNextInBatch() {
114114
RawBsonDocument nextInBatch = curBatch.get(curPos);
115+
curBatch.set(curPos, null);
115116
resumeToken = nextInBatch.getDocument("_id");
116117
if (curPos < curBatch.size() - 1) {
117118
curPos++;

driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSFindIterableSpecification.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,13 @@ class GridFSFindIterableSpecification extends Specification {
130130
, null),
131131
]
132132
def cursor = {
133+
def batchToReturn = cannedResults.collect();
133134
Stub(BatchCursor) {
134135
def count = 0
135136
def results
136137
def getResult = {
137138
count++
138-
results = count == 1 ? cannedResults : null
139+
results = count == 1 ? batchToReturn : null
139140
results
140141
}
141142
next() >> {

driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,13 @@ class AggregateIterableSpecification extends Specification {
395395
given:
396396
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
397397
def cursor = {
398+
def batchToReturn = cannedResults.collect()
398399
Stub(BatchCursor) {
399400
def count = 0
400401
def results
401402
def getResult = {
402403
count++
403-
results = count == 1 ? cannedResults : null
404+
results = count == 1 ? batchToReturn : null
404405
results
405406
}
406407
next() >> {
@@ -591,12 +592,13 @@ class AggregateIterableSpecification extends Specification {
591592
given:
592593
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
593594
def cursor = {
595+
def batchToReturn = cannedResults.collect()
594596
Stub(BatchCursor) {
595597
def count = 0
596598
def results
597599
def getResult = {
598600
count++
599-
results = count == 1 ? cannedResults : null
601+
results = count == 1 ? batchToReturn : null
600602
results
601603
}
602604
next() >> {

driver-sync/src/test/unit/com/mongodb/client/internal/ChangeStreamIterableSpecification.groovy

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ class ChangeStreamIterableSpecification extends Specification {
160160
def cannedResults = ['{_id: {_data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect {
161161
RawBsonDocument.parse(it)
162162
}
163-
def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults),
164-
cursor(cannedResults)])
163+
def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()),
164+
cursor(cannedResults.collect()), cursor(cannedResults.collect())])
165165
def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [],
166166
Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS)
167167

@@ -208,8 +208,9 @@ class ChangeStreamIterableSpecification extends Specification {
208208
given:
209209
def count = 0
210210
def cannedResults = ['{_id: { _data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect { RawBsonDocument.parse(it) }
211-
def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults),
212-
cursor(cannedResults)])
211+
def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()),
212+
cursor(cannedResults.collect()), cursor(cannedResults.collect()),
213+
cursor(cannedResults.collect())])
213214
def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [],
214215
Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS).withDocumentClass(RawBsonDocument)
215216

driver-sync/src/test/unit/com/mongodb/client/internal/DistinctIterableSpecification.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,13 @@ class DistinctIterableSpecification extends Specification {
128128
given:
129129
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
130130
def cursor = {
131+
def batchToReturn = cannedResults.collect()
131132
Stub(BatchCursor) {
132133
def count = 0
133134
def results
134135
def getResult = {
135136
count++
136-
results = count == 1 ? cannedResults : null
137+
results = count == 1 ? batchToReturn : null
137138
results
138139
}
139140
next() >> {

0 commit comments

Comments
 (0)