diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
index 170b33a3398..28d5adbdfc7 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
@@ -24,6 +24,7 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.connection.ClusterDescription;
+import com.mongodb.reactivestreams.client.internal.BatchCursor;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
@@ -41,6 +42,7 @@ public class SyncMongoClient implements MongoClient {
private static long sleepAfterCursorCloseMS;
private static long sleepAfterSessionCloseMS;
+ private static boolean waitForBatchCursorCreation;
/**
* Unfortunately this is the only way to wait for a query to be initiated, since Reactive Streams is asynchronous
@@ -88,6 +90,27 @@ public static void enableSleepAfterSessionClose(final long sleepMS) {
sleepAfterSessionCloseMS = sleepMS;
}
+ /**
+ * Enables behavior for waiting until a reactive {@link BatchCursor} is created.
+ *
+ * When enabled, {@link SyncMongoCursor} allows intercepting the result of the cursor creation process.
+ * If the creation fails, the resulting exception will be propagated; if successful, the
+ * process will proceed to issue getMore commands.
+ *
+ * NOTE: Do not enable when multiple cursors are being iterated concurrently.
+ */
+ public static void enableWaitForBatchCursorCreation() {
+ waitForBatchCursorCreation = true;
+ }
+
+ public static boolean isWaitForBatchCursorCreationEnabled() {
+ return waitForBatchCursorCreation;
+ }
+
+ public static void disableWaitForBatchCursorCreation() {
+ waitForBatchCursorCreation = false;
+ }
+
public static void disableSleep() {
sleepAfterCursorOpenMS = 0;
sleepAfterCursorCloseMS = 0;
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java
index c21cbc0e9f0..63485fba132 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java
@@ -21,26 +21,36 @@
import com.mongodb.ServerCursor;
import com.mongodb.client.MongoCursor;
import com.mongodb.lang.Nullable;
+import com.mongodb.reactivestreams.client.internal.BatchCursor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Operators;
+import reactor.util.context.Context;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static com.mongodb.ClusterFixture.TIMEOUT;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen;
+import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.isWaitForBatchCursorCreationEnabled;
class SyncMongoCursor implements MongoCursor {
private static final Object COMPLETED = new Object();
private final BlockingDeque results = new LinkedBlockingDeque<>();
+ private final CompletableFuture batchCursorCompletableFuture = new CompletableFuture<>();
private final Integer batchSize;
private int countToBatchSize;
private Subscription subscription;
@@ -51,6 +61,15 @@ class SyncMongoCursor implements MongoCursor {
SyncMongoCursor(final Publisher publisher, @Nullable final Integer batchSize) {
this.batchSize = batchSize;
CountDownLatch latch = new CountDownLatch(1);
+
+ if (isWaitForBatchCursorCreationEnabled()) {
+ // This hook allows us to intercept the `onNext` and `onError` signals for any operation to determine
+ // whether the {@link BatchCursor} was created successfully or if an error occurred during its creation process.
+ // The result is propagated to a {@link CompletableFuture}, which we use to block until it is completed.
+ Hooks.onEachOperator(Operators.lift((sc, sub) ->
+ new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture)));
+ }
+
//noinspection ReactiveStreamsSubscriberImplementation
Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber() {
@Override
@@ -83,9 +102,19 @@ public void onComplete() {
if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Timeout waiting for subscription");
}
+ if (isWaitForBatchCursorCreationEnabled()) {
+ batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
+ Hooks.resetOnEachOperator();
+ }
sleep(getSleepAfterCursorOpen());
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e);
+ } catch (ExecutionException | TimeoutException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ }
+ throw new RuntimeException(e);
}
}
@@ -181,4 +210,49 @@ private RuntimeException translateError(final Throwable throwable) {
}
return new RuntimeException(throwable);
}
+
+
+ private static final class BatchCursorInterceptSubscriber implements CoreSubscriber {
+
+ private final CoreSubscriber sub;
+ private final CompletableFuture batchCursorCompletableFuture;
+
+ BatchCursorInterceptSubscriber(final CoreSubscriber sub,
+ final CompletableFuture batchCursorCompletableFuture) {
+ this.sub = sub;
+ this.batchCursorCompletableFuture = batchCursorCompletableFuture;
+ }
+
+ @Override
+ public Context currentContext() {
+ return sub.currentContext();
+ }
+
+ @Override
+ public void onSubscribe(final Subscription s) {
+ sub.onSubscribe(s);
+ }
+
+ @Override
+ public void onNext(final Object o) {
+ if (o instanceof BatchCursor) {
+ // Interception of a cursor means that it has been created at this point.
+ batchCursorCompletableFuture.complete(o);
+ }
+ sub.onNext(o);
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ if (!batchCursorCompletableFuture.isDone()) { // Cursor has not been created yet but an error occurred.
+ batchCursorCompletableFuture.completeExceptionally(t);
+ }
+ sub.onError(t);
+ }
+
+ @Override
+ public void onComplete() {
+ sub.onComplete();
+ }
+ }
}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java
index fc7b196e1c8..f1b3c435b4b 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java
@@ -29,17 +29,16 @@
import java.util.List;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableSleep;
+import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableWaitForBatchCursorCreation;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorOpen;
+import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableWaitForBatchCursorCreation;
import static org.junit.Assume.assumeFalse;
public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
private static final List ERROR_REQUIRED_FROM_CHANGE_STREAM_INITIALIZATION_TESTS =
Arrays.asList(
- "Test with document comment - pre 4.4",
- "Change Stream should error when an invalid aggregation stage is passed in",
- "The watch helper must not throw a custom exception when executed against a single server topology, "
- + "but instead depend on a server error"
+ "Test with document comment - pre 4.4"
);
private static final List EVENT_SENSITIVE_TESTS =
@@ -48,6 +47,14 @@ public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
"Test that comment is not set on getMore - pre 4.4"
);
+ private static final List REQUIRES_BATCH_CURSOR_CREATION_WAITING =
+ Arrays.asList(
+ "Change Stream should error when an invalid aggregation stage is passed in",
+ "The watch helper must not throw a custom exception when executed against a single server topology, "
+ + "but instead depend on a server error"
+ );
+
+
public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescription,
@SuppressWarnings("unused") final String testDescription,
final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities,
@@ -58,12 +65,17 @@ public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescriptio
assumeFalse(EVENT_SENSITIVE_TESTS.contains(testDescription));
enableSleepAfterCursorOpen(256);
+
+ if (REQUIRES_BATCH_CURSOR_CREATION_WAITING.contains(testDescription)) {
+ enableWaitForBatchCursorCreation();
+ }
}
@After
public void cleanUp() {
super.cleanUp();
disableSleep();
+ disableWaitForBatchCursorCreation();
}
@Parameterized.Parameters(name = "{0}: {1}")