Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults;
import static java.util.Collections.emptyList;

class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
Expand Down Expand Up @@ -117,6 +118,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
}

if (serverCursorIsNull || !batchResults.isEmpty()) {
commandCursorResult = withEmptyResults(commandCursorResult);
funcCallback.onResult(batchResults, null);
} else {
getMore(localServerCursor, funcCallback);
Expand Down Expand Up @@ -206,6 +208,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
resourceManager.setServerCursor(nextServerCursor);
List<T> nextBatch = commandCursorResult.getResults();
if (nextServerCursor == null || !nextBatch.isEmpty()) {
commandCursorResult = withEmptyResults(commandCursorResult);
callback.onResult(nextBatch, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
boolean hasNext();

/**
* Returns the next batch of results. A tailable cursor will block until another batch exists.
* Returns the next batch of results as a mutable list. Modifications to the list will not affect the cursor state.
* A tailable cursor will block until another batch exists.
*
* @return the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
Expand Down Expand Up @@ -89,7 +90,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
int getBatchSize();

/**
* A special {@code next()} case that returns the next batch if available or null.
* A special {@code next()} case that returns the next batch as a mutable list if available or null.
* Modifications to the list will not affect the cursor state.
*
* <p>Tailable cursors are an example where this is useful. A call to {@code tryNext()} may return null, but in the future calling
* {@code tryNext()} would return a new batch if a document had been added to the capped collection.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private List<T> doNext() {

List<T> retVal = nextBatch;
nextBatch = null;
commandCursorResult = CommandCursorResult.withEmptyResults(commandCursorResult);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

import java.util.Collections;
import java.util.List;

import static com.mongodb.assertions.Assertions.isTrue;
Expand Down Expand Up @@ -60,6 +61,31 @@ public CommandCursorResult(
this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null);
}

private CommandCursorResult(
final ServerAddress serverAddress,
final List<T> results,
final MongoNamespace namespace,
final long cursorId,
@Nullable final BsonTimestamp operationTime,
@Nullable final BsonDocument postBatchResumeToken) {
this.serverAddress = serverAddress;
this.results = results;
this.namespace = namespace;
this.cursorId = cursorId;
this.operationTime = operationTime;
this.postBatchResumeToken = postBatchResumeToken;
}

public static <T> CommandCursorResult<T> withEmptyResults(final CommandCursorResult<T> commandCursorResult) {
return new CommandCursorResult<>(
commandCursorResult.getServerAddress(),
Collections.emptyList(),
commandCursorResult.getNamespace(),
commandCursorResult.getCursorId(),
commandCursorResult.getOperationTime(),
commandCursorResult.getPostBatchResumeToken());
}

/**
* Gets the namespace.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public ServerAddress getServerAddress() {

private T getNextInBatch() {
T nextInBatch = curBatch.get(curPos);
curBatch.set(curPos, null);
if (curPos < curBatch.size() - 1) {
curPos++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public ServerAddress getServerAddress() {

private T getNextInBatch() {
RawBsonDocument nextInBatch = curBatch.get(curPos);
curBatch.set(curPos, null);
resumeToken = nextInBatch.getDocument("_id");
if (curPos < curBatch.size() - 1) {
curPos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,46 @@ class MongoBatchCursorAdapterSpecification extends Specification {

def 'should get next from batch cursor'() {
given:
def firstBatch = [new Document('x', 1), new Document('x', 1)]
def secondBatch = [new Document('x', 2)]
def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [new Document('x', 2)]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(BatchCursor)

batchCursor.hasNext() >>> [true, true, true, true, false]
batchCursor.next() >>> [firstBatch, secondBatch]
batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor]

def cursor = new MongoBatchCursorAdapter(batchCursor)

expect:
cursor.hasNext()
cursor.next() == firstBatch[0]
cursor.next() == expectedFirstBatch[0]
cursor.hasNext()
cursor.next() == firstBatch[1]
cursor.next() == expectedFirstBatch[1]
cursor.hasNext()
cursor.next() == secondBatch[0]
cursor.next() == expectedSecondBatch[0]
!cursor.hasNext()
}

def 'should try next from batch cursor'() {
given:
def firstBatch = [new Document('x', 1), new Document('x', 1)]
def secondBatch = [new Document('x', 2)]
def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [new Document('x', 2)]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(BatchCursor)

batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null]
batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null]

def cursor = new MongoBatchCursorAdapter(batchCursor)

expect:
cursor.tryNext() == firstBatch[0]
cursor.tryNext() == firstBatch[1]
cursor.tryNext() == expectedFirstBatch[0]
cursor.tryNext() == expectedFirstBatch[1]
cursor.tryNext() == null
cursor.tryNext() == secondBatch[0]
cursor.tryNext() == expectedSecondBatch[0]
cursor.tryNext() == null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,63 +91,70 @@ class MongoChangeStreamCursorSpecification extends Specification {

def 'should get next from batch cursor'() {
given:
def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),

def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(AggregateResponseBatchCursor)
def codec = new RawBsonDocumentCodec()
def resumeToken = Mock(BsonDocument)

batchCursor.hasNext() >>> [true, true, true, true, false]
batchCursor.next() >>> [firstBatch, secondBatch]
batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor]

def cursor = new MongoChangeStreamCursorImpl(batchCursor, codec, resumeToken)

expect:
cursor.hasNext()
cursor.next() == firstBatch[0]
cursor.next() == expectedFirstBatch[0]
cursor.hasNext()
cursor.next() == firstBatch[1]
cursor.next() == expectedFirstBatch[1]
cursor.hasNext()
cursor.next() == secondBatch[0]
cursor.next() == expectedSecondBatch[0]
!cursor.hasNext()
}

def 'should try next from batch cursor'() {
given:
def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(AggregateResponseBatchCursor)
def codec = new RawBsonDocumentCodec()
def resumeToken = Mock(BsonDocument)

batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null]
batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null]

def cursor = new MongoChangeStreamCursorImpl(batchCursor, codec, resumeToken)

expect:
cursor.tryNext() == firstBatch[0]
cursor.tryNext() == firstBatch[1]
cursor.tryNext() == expectedFirstBatch[0]
cursor.tryNext() == expectedFirstBatch[1]
cursor.tryNext() == null
cursor.tryNext() == secondBatch[0]
cursor.tryNext() == expectedSecondBatch[0]
cursor.tryNext() == null
}

def 'should get cached resume token after next'() {
given:
def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
List<BsonDocument> secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(AggregateResponseBatchCursor)
def codec = new RawBsonDocumentCodec()
def resumeToken = new BsonDocument('_data', new BsonInt32(1))

batchCursor.hasNext() >>> [true, true, true, false]
batchCursor.next() >>> [firstBatch, secondBatch]
batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor]
batchCursor.getPostBatchResumeToken() >>> [new BsonDocument('_data', new BsonInt32(2)),
new BsonDocument('_data', new BsonInt32(2)),
new BsonDocument('_data', new BsonInt32(3)),
Expand All @@ -157,26 +164,29 @@ class MongoChangeStreamCursorSpecification extends Specification {

expect:
cursor.getResumeToken() == resumeToken
cursor.next() == firstBatch.head()
cursor.next() == expectedFirstBatch.head()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(1))
cursor.next() == firstBatch.last()
cursor.next() == expectedFirstBatch.last()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2))
cursor.next() == secondBatch.head()
cursor.next() == expectedSecondBatch.head()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3))
}

def 'should get cached resume token after tryNext'() {
given:
def firstBatch = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def secondBatch = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def firstBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 1 }, x: 1 }'),
RawBsonDocument.parse('{ _id: { _data: 2 }, x: 1 }')]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [RawBsonDocument.parse('{ _id: { _data: 3 }, x: 2 }')]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()


def batchCursor = Stub(AggregateResponseBatchCursor)
def codec = new RawBsonDocumentCodec()
def resumeToken = new BsonDocument('_data', new BsonInt32(1))

batchCursor.hasNext() >>> [true, true, true, false]
batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null]
batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null]
batchCursor.getPostBatchResumeToken() >>> [new BsonDocument('_data', new BsonInt32(2)),
new BsonDocument('_data', new BsonInt32(2)),
new BsonDocument('_data', new BsonInt32(2)),
Expand All @@ -189,13 +199,13 @@ class MongoChangeStreamCursorSpecification extends Specification {

expect:
cursor.getResumeToken() == resumeToken
cursor.tryNext() == firstBatch.head()
cursor.tryNext() == expectedFirstBatch.head()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(1))
cursor.tryNext() == firstBatch.last()
cursor.tryNext() == expectedFirstBatch.last()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2))
cursor.tryNext() == null
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(2))
cursor.tryNext() == secondBatch.head()
cursor.tryNext() == expectedSecondBatch.head()
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3))
cursor.tryNext() == null
cursor.getResumeToken() == new BsonDocument('_data', new BsonInt32(3))
Expand Down