From 1e29f657d7f5e731b4aa4815d9da14139ab0cf4b Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 18 Mar 2024 13:39:35 -0700 Subject: [PATCH 1/2] Add BatchCursor interceptor in reactive tests JAVA-5356 --- .../client/syncadapter/SyncMongoClient.java | 13 ++++ .../client/syncadapter/SyncMongoCursor.java | 71 +++++++++++++++++++ .../client/unified/ChangeStreamsTest.java | 20 ++++-- 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java index 170b33a3398..6a3d0910cf1 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java @@ -41,6 +41,7 @@ public class SyncMongoClient implements MongoClient { private static long sleepAfterCursorCloseMS; private static long sleepAfterSessionCloseMS; + private static boolean waitForBatchCursorCreation; /** * Unfortunately this is the only way to wait for a query to be initiated, since Reactive Streams is asynchronous @@ -88,6 +89,18 @@ public static void enableSleepAfterSessionClose(final long sleepMS) { sleepAfterSessionCloseMS = sleepMS; } + public static void enableWaitForBatchCursorCreation() { + waitForBatchCursorCreation = true; + } + + public static boolean isWaitForBatchCursorCreationEnabled() { + return waitForBatchCursorCreation; + } + + public static void disableWaitForBatchCursorCreation() { + waitForBatchCursorCreation = false; + } + public static void disableSleep() { sleepAfterCursorOpenMS = 0; sleepAfterCursorCloseMS = 0; diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index c21cbc0e9f0..e3307c48c13 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -21,26 +21,36 @@ import com.mongodb.ServerCursor; import com.mongodb.client.MongoCursor; import com.mongodb.lang.Nullable; +import com.mongodb.reactivestreams.client.internal.BatchCursor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Operators; +import reactor.util.context.Context; import java.util.NoSuchElementException; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen; +import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.isWaitForBatchCursorCreationEnabled; class SyncMongoCursor implements MongoCursor { private static final Object COMPLETED = new Object(); private final BlockingDeque results = new LinkedBlockingDeque<>(); + private final CompletableFuture batchCursorCompletableFuture = new CompletableFuture<>(); private final Integer batchSize; private int countToBatchSize; private Subscription subscription; @@ -51,6 +61,12 @@ class SyncMongoCursor implements MongoCursor { SyncMongoCursor(final Publisher publisher, @Nullable final Integer batchSize) { this.batchSize = batchSize; CountDownLatch latch = new CountDownLatch(1); + + if (isWaitForBatchCursorCreationEnabled()) { + Hooks.onEachOperator(Operators.lift((sc, sub) -> + new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture))); + } + //noinspection ReactiveStreamsSubscriberImplementation Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber() { @Override @@ -83,9 +99,19 @@ public void onComplete() { if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Timeout waiting for subscription"); } + if (isWaitForBatchCursorCreationEnabled()) { + batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + Hooks.resetOnEachOperator(); + } sleep(getSleepAfterCursorOpen()); } catch (InterruptedException e) { throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); + } catch (ExecutionException | TimeoutException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new RuntimeException(e); } } @@ -181,4 +207,49 @@ private RuntimeException translateError(final Throwable throwable) { } return new RuntimeException(throwable); } + + + private static final class BatchCursorInterceptSubscriber implements CoreSubscriber { + + private final CoreSubscriber sub; + private final CompletableFuture batchCursorCompletableFuture; + + BatchCursorInterceptSubscriber(final CoreSubscriber sub, + final CompletableFuture batchCursorCompletableFuture) { + this.sub = sub; + this.batchCursorCompletableFuture = batchCursorCompletableFuture; + } + + @Override + public Context currentContext() { + return sub.currentContext(); + } + + @Override + public void onSubscribe(final Subscription s) { + sub.onSubscribe(s); + } + + @Override + public void onNext(final Object o) { + if (o instanceof BatchCursor) { + // Interception of a cursor means that it has been created at this point. + batchCursorCompletableFuture.complete(o); + } + sub.onNext(o); + } + + @Override + public void onError(final Throwable t) { + if (!batchCursorCompletableFuture.isDone()) { // Cursor has not been created yet but an error occurred. + batchCursorCompletableFuture.completeExceptionally(t); + } + sub.onError(t); + } + + @Override + public void onComplete() { + sub.onComplete(); + } + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java index fc7b196e1c8..f1b3c435b4b 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java @@ -29,17 +29,16 @@ import java.util.List; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableSleep; +import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableWaitForBatchCursorCreation; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorOpen; +import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableWaitForBatchCursorCreation; import static org.junit.Assume.assumeFalse; public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest { private static final List ERROR_REQUIRED_FROM_CHANGE_STREAM_INITIALIZATION_TESTS = Arrays.asList( - "Test with document comment - pre 4.4", - "Change Stream should error when an invalid aggregation stage is passed in", - "The watch helper must not throw a custom exception when executed against a single server topology, " - + "but instead depend on a server error" + "Test with document comment - pre 4.4" ); private static final List EVENT_SENSITIVE_TESTS = @@ -48,6 +47,14 @@ public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest { "Test that comment is not set on getMore - pre 4.4" ); + private static final List REQUIRES_BATCH_CURSOR_CREATION_WAITING = + Arrays.asList( + "Change Stream should error when an invalid aggregation stage is passed in", + "The watch helper must not throw a custom exception when executed against a single server topology, " + + "but instead depend on a server error" + ); + + public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescription, @SuppressWarnings("unused") final String testDescription, final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, @@ -58,12 +65,17 @@ public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescriptio assumeFalse(EVENT_SENSITIVE_TESTS.contains(testDescription)); enableSleepAfterCursorOpen(256); + + if (REQUIRES_BATCH_CURSOR_CREATION_WAITING.contains(testDescription)) { + enableWaitForBatchCursorCreation(); + } } @After public void cleanUp() { super.cleanUp(); disableSleep(); + disableWaitForBatchCursorCreation(); } @Parameterized.Parameters(name = "{0}: {1}") From 1f22d65c68aea81954ddcd58ba7d75a809bfea29 Mon Sep 17 00:00:00 2001 From: Jeff Yemin Date: Mon, 13 May 2024 21:59:31 -0400 Subject: [PATCH 2/2] Add documentation --- .../client/syncadapter/SyncMongoClient.java | 10 ++++++++++ .../client/syncadapter/SyncMongoCursor.java | 3 +++ 2 files changed, 13 insertions(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java index 6a3d0910cf1..28d5adbdfc7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java @@ -24,6 +24,7 @@ import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; import com.mongodb.connection.ClusterDescription; +import com.mongodb.reactivestreams.client.internal.BatchCursor; import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; @@ -89,6 +90,15 @@ public static void enableSleepAfterSessionClose(final long sleepMS) { sleepAfterSessionCloseMS = sleepMS; } + /** + * Enables behavior for waiting until a reactive {@link BatchCursor} is created. + *

+ * When enabled, {@link SyncMongoCursor} allows intercepting the result of the cursor creation process. + * If the creation fails, the resulting exception will be propagated; if successful, the + * process will proceed to issue getMore commands. + *

+ * NOTE: Do not enable when multiple cursors are being iterated concurrently. + */ public static void enableWaitForBatchCursorCreation() { waitForBatchCursorCreation = true; } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index e3307c48c13..63485fba132 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -63,6 +63,9 @@ class SyncMongoCursor implements MongoCursor { CountDownLatch latch = new CountDownLatch(1); if (isWaitForBatchCursorCreationEnabled()) { + // This hook allows us to intercept the `onNext` and `onError` signals for any operation to determine + // whether the {@link BatchCursor} was created successfully or if an error occurred during its creation process. + // The result is propagated to a {@link CompletableFuture}, which we use to block until it is completed. Hooks.onEachOperator(Operators.lift((sc, sub) -> new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture))); }