From 318b72c44cc909bb34b20a54e37b7183c9ac5189 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 13 Aug 2024 20:16:02 -0700 Subject: [PATCH 1/8] Fix exception propagation in Async API methods - Resolve an issue where exceptions thrown during thenRun, thenSupply, and related operations in the asynchronous API were not properly propagated to the completion callback. This issue was addressed by replacing `unsafeFinish` with `finish`, ensuring that exceptions are caught and correctly passed to the completion callback when executed on different threads. - Update existing Async API tests to ensure they simulate separate async thread execution. JAVA-5562 --- .../mongodb/internal/async/AsyncFunction.java | 26 +++ .../mongodb/internal/async/AsyncRunnable.java | 11 +- .../mongodb/internal/async/AsyncSupplier.java | 24 ++- ...t.java => AsyncFunctionsAbstractTest.java} | 181 +++++++++--------- ...tract.java => AsyncFunctionsTestBase.java} | 91 +++++++-- .../async/SameThreadAsyncFunctionsTest.java | 92 +++++++++ .../SeparateThreadAsyncFunctionsTest.java | 123 ++++++++++++ 7 files changed, 426 insertions(+), 122 deletions(-) rename driver-core/src/test/unit/com/mongodb/internal/async/{AsyncFunctionsTest.java => AsyncFunctionsAbstractTest.java} (88%) rename driver-core/src/test/unit/com/mongodb/internal/async/{AsyncFunctionsTestAbstract.java => AsyncFunctionsTestBase.java} (80%) create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index 5be92558ee0..00fb7097d67 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -18,6 +18,8 @@ import com.mongodb.lang.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; + /** * See {@link AsyncRunnable} *

@@ -33,4 +35,28 @@ public interface AsyncFunction { * @param callback the callback */ void unsafeFinish(T value, SingleResultCallback callback); + + /** + * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. + * + * @param callback the callback provided by the method the chain is used in. + */ + default void finish(final T value, final SingleResultCallback callback) { + final AtomicBoolean callbackInvoked = new AtomicBoolean(false); + try { + this.unsafeFinish(value, (v, e) -> { + if (!callbackInvoked.compareAndSet(false, true)) { + throw new AssertionError("Callback has been already completed. It could happen " + + "if code throws an exception after invoking an async method."); + } + callback.onResult(v, e); + }); + } catch (Throwable t) { + if (!callbackInvoked.compareAndSet(false, true)) { + throw t; + } else { + callback.completeExceptionally(t); + } + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index a81b2fdd12c..a1072d1bfea 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -169,9 +169,14 @@ default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultC */ default AsyncRunnable thenRun(final AsyncRunnable runnable) { return (c) -> { + /* This call of 'unsafeFinish' initiates the async chain and should be triggered from the thread that called 'finish()'. + However, the callback provided to 'unsafeFinish' may execute on a different thread, thus 'finish()' instead of 'unsafeFinish()' + must be used within callbacks. */ this.unsafeFinish((r, e) -> { if (e == null) { - runnable.unsafeFinish(c); + /* If 'runnable' is executed on a different thread from the one that executed the initial 'finish()', + then invoking 'finish()' within 'runnable' will catch and propagate any exceptions to 'c' (the callback). */ + runnable.finish(c); } else { c.completeExceptionally(e); } @@ -236,7 +241,7 @@ default AsyncRunnable thenRunIf(final Supplier condition, final AsyncRu return; } if (matched) { - runnable.unsafeFinish(callback); + runnable.finish(callback); } else { callback.complete(callback); } @@ -253,7 +258,7 @@ default AsyncSupplier thenSupply(final AsyncSupplier supplier) { return (c) -> { this.unsafeFinish((r, e) -> { if (e == null) { - supplier.unsafeFinish(c); + supplier.finish(c); } else { c.completeExceptionally(e); } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index b7d24dd3df5..3b61dcd876e 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -18,6 +18,7 @@ import com.mongodb.lang.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -54,18 +55,25 @@ default void unsafeFinish(@Nullable final Void value, final SingleResultCallback } /** - * Must be invoked at end of async chain. + * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. + * + * @see #thenApply(AsyncFunction) + * @see #thenConsume(AsyncConsumer) + * @see #onErrorIf(Predicate, AsyncFunction) * @param callback the callback provided by the method the chain is used in */ default void finish(final SingleResultCallback callback) { - final boolean[] callbackInvoked = {false}; + final AtomicBoolean callbackInvoked = new AtomicBoolean(false); try { this.unsafeFinish((v, e) -> { - callbackInvoked[0] = true; + if (!callbackInvoked.compareAndSet(false, true)) { + throw new AssertionError("Callback has been already completed. It could happen " + + "if code throws an exception after invoking an async method."); + } callback.onResult(v, e); }); } catch (Throwable t) { - if (callbackInvoked[0]) { + if (!callbackInvoked.compareAndSet(false, true)) { throw t; } else { callback.completeExceptionally(t); @@ -80,9 +88,9 @@ default void finish(final SingleResultCallback callback) { */ default AsyncSupplier thenApply(final AsyncFunction function) { return (c) -> { - this.unsafeFinish((v, e) -> { + this.finish((v, e) -> { if (e == null) { - function.unsafeFinish(v, c); + function.finish(v, c); } else { c.completeExceptionally(e); } @@ -99,7 +107,7 @@ default AsyncRunnable thenConsume(final AsyncConsumer consumer) { return (c) -> { this.unsafeFinish((v, e) -> { if (e == null) { - consumer.unsafeFinish(v, c); + consumer.finish(v, c); } else { c.completeExceptionally(e); } @@ -131,7 +139,7 @@ default AsyncSupplier onErrorIf( return; } if (errorMatched) { - errorFunction.unsafeFinish(e, callback); + errorFunction.finish(e, callback); } else { callback.completeExceptionally(e); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java similarity index 88% rename from driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java rename to driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 20553fe881a..7892cbdf988 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -19,16 +19,37 @@ import com.mongodb.internal.TimeoutSettings; import org.junit.jupiter.api.Test; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; -import static org.junit.jupiter.api.Assertions.assertThrows; -final class AsyncFunctionsTest extends AsyncFunctionsTestAbstract { +abstract class AsyncFunctionsAbstractTest extends AsyncFunctionsTestBase { private static final TimeoutContext TIMEOUT_CONTEXT = new TimeoutContext(new TimeoutSettings(0, 0, 0, 0L, 0)); + +// @Test +// void test(){ +// +// SingleResultCallback userCallback = (result, t) -> { +// if (t != null) { +// System.out.println("Error:" + t.getMessage()); +// } +// System.out.println("Result:" + result); +// }; +// +// beginAsync().thenRun(c -> { +// // Async imitation +// new Thread() { +// public void run() { +// c.complete(c); +// } +// }.start(); +// }).thenRun(callback -> { +// // Assume this is plain code +// throw new RuntimeException("Error adada"); +// }).finish(userCallback); +// } + @Test void test1Method() { // the number of expected variations is often: 1 + N methods invoked @@ -760,93 +781,74 @@ void testVariables() { }); } - @Test - void testInvalid() { - setIsTestingAbruptCompletion(false); - setAsyncStep(true); - assertThrows(IllegalStateException.class, () -> { - beginAsync().thenRun(c -> { - async(3, c); - throw new IllegalStateException("must not cause second callback invocation"); - }).finish((v, e) -> {}); - }); - assertThrows(IllegalStateException.class, () -> { - beginAsync().thenRun(c -> { - async(3, c); - }).finish((v, e) -> { - throw new IllegalStateException("must not cause second callback invocation"); - }); - }); - } - @Test void testDerivation() { - // Demonstrates the progression from nested async to the API. - - // Stand-ins for sync-async methods; these "happily" do not throw - // exceptions, to avoid complicating this demo async code. - Consumer happySync = (i) -> { - getNextOption(1); - listenerAdd("affected-success-" + i); - }; - BiConsumer> happyAsync = (i, c) -> { - happySync.accept(i); - c.complete(c); - }; - - // Standard nested async, no error handling: - assertBehavesSameVariations(1, - () -> { - happySync.accept(1); - happySync.accept(2); - }, - (callback) -> { - happyAsync.accept(1, (v, e) -> { - happyAsync.accept(2, callback); - }); - }); - - // When both methods are naively extracted, they are out of order: - assertBehavesSameVariations(1, - () -> { - happySync.accept(1); - happySync.accept(2); - }, - (callback) -> { - SingleResultCallback second = (v, e) -> { - happyAsync.accept(2, callback); - }; - SingleResultCallback first = (v, e) -> { - happyAsync.accept(1, second); - }; - first.onResult(null, null); - }); - - // We create an "AsyncRunnable" that takes a callback, which - // decouples any async methods from each other, allowing them - // to be declared in a sync-like order, and without nesting: - assertBehavesSameVariations(1, - () -> { - happySync.accept(1); - happySync.accept(2); - }, - (callback) -> { - AsyncRunnable first = (SingleResultCallback c) -> { - happyAsync.accept(1, c); - }; - AsyncRunnable second = (SingleResultCallback c) -> { - happyAsync.accept(2, c); - }; - // This is a simplified variant of the "then" methods; - // it has no error handling. It takes methods A and B, - // and returns C, which is B(A()). - AsyncRunnable combined = (c) -> { - first.unsafeFinish((r, e) -> { - second.unsafeFinish(c); - }); - }; - combined.unsafeFinish(callback); - }); +// // Demonstrates the progression from nested async to the API. +// +// // Stand-ins for sync-async methods; these "happily" do not throw +// // exceptions, to avoid complicating this demo async code. +// Consumer happySync = (i) -> { +// getNextOption(1); +// listenerAdd("affected-success-" + i); +// }; +// BiConsumer> happyAsync = (i, c) -> { +// happySync.accept(i); +// c.complete(c); +// }; +// +// // Standard nested async, no error handling: +// assertBehavesSameVariations(1, +// () -> { +// happySync.accept(1); +// happySync.accept(2); +// }, +// (callback) -> { +// happyAsync.accept(1, (v, e) -> { +// happyAsync.accept(2, callback); +// }); +// }); +// +// // When both methods are naively extracted, they are out of order: +// assertBehavesSameVariations(1, +// () -> { +// happySync.accept(1); +// happySync.accept(2); +// }, +// (callback) -> { +// SingleResultCallback second = (v, e) -> { +// happyAsync.accept(2, callback); +// }; +// SingleResultCallback first = (v, e) -> { +// happyAsync.accept(1, second); +// }; +// first.onResult(null, null); +// }); +// +// // We create an "AsyncRunnable" that takes a callback, which +// // decouples any async methods from each other, allowing them +// // to be declared in a sync-like order, and without nesting: +// assertBehavesSameVariations(1, +// () -> { +// happySync.accept(1); +// happySync.accept(2); +// }, +// (callback) -> { +// AsyncRunnable first = (SingleResultCallback c) -> { +// happyAsync.accept(1, c); +// }; +// AsyncRunnable second = (SingleResultCallback c) -> { +// happyAsync.accept(2, c); +// }; +// // This is a simplified variant of the "then" methods; +// // it has no error handling. It takes methods A and B, +// // and returns C, which is B(A()). +// AsyncRunnable combined = (c) -> { +// first.unsafeFinish((r, e) -> { +// second.unsafeFinish(c); +// }); +// }; +// combined.unsafeFinish(callback); +// }); // This combining method is added as a default method on AsyncRunnable, // and a "finish" method wraps the resulting methods. This also adds @@ -866,5 +868,4 @@ void testDerivation() { }).finish(callback); }); } - } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java similarity index 80% rename from driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java rename to driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java index 7cc8b456f1c..1229dbcfcad 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestAbstract.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java @@ -17,11 +17,17 @@ package com.mongodb.internal.async; import com.mongodb.client.TestListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.opentest4j.AssertionFailedError; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -31,11 +37,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class AsyncFunctionsTestAbstract { +public abstract class AsyncFunctionsTestBase { private final TestListener listener = new TestListener(); private final InvocationTracker invocationTracker = new InvocationTracker(); private boolean isTestingAbruptCompletion = false; + private ExecutorService asyncExecutor; void setIsTestingAbruptCompletion(final boolean b) { isTestingAbruptCompletion = b; @@ -53,6 +60,23 @@ public void listenerAdd(final String s) { listener.add(s); } + /** + * Create an executor service for async operations before each test. + * + * @return the executor service. + */ + public abstract ExecutorService createAsyncExecutor(); + + @BeforeEach + public void setUp() { + asyncExecutor = createAsyncExecutor(); + } + + @AfterEach + public void shutDown() { + asyncExecutor.shutdownNow(); + } + void plain(final int i) { int cur = invocationTracker.getNextOption(2); if (cur == 0) { @@ -98,32 +122,47 @@ Integer syncReturns(final int i) { return affectedReturns(i); } + + public void submit(final Runnable task) { + asyncExecutor.execute(task); + } void async(final int i, final SingleResultCallback callback) { assertTrue(invocationTracker.isAsyncStep); if (isTestingAbruptCompletion) { + /* We should not test for abrupt completion in a separate thread. Once a callback is registered for an async operation, + the Async Framework does not handle exceptions thrown outside of callbacks by the executing thread. Such exception management + should be the responsibility of the thread conducting the asynchronous operations. */ affected(i); - callback.complete(callback); - - } else { - try { - affected(i); + submit(() -> { callback.complete(callback); - } catch (Throwable t) { - callback.onResult(null, t); - } + }); + } else { + submit(() -> { + try { + affected(i); + callback.complete(callback); + } catch (Throwable t) { + callback.onResult(null, t); + } + }); } } void asyncReturns(final int i, final SingleResultCallback callback) { assertTrue(invocationTracker.isAsyncStep); if (isTestingAbruptCompletion) { - callback.complete(affectedReturns(i)); + int result = affectedReturns(i); + submit(() -> { + callback.complete(result); + }); } else { - try { - callback.complete(affectedReturns(i)); - } catch (Throwable t) { - callback.onResult(null, t); - } + submit(() -> { + try { + callback.complete(affectedReturns(i)); + } catch (Throwable t) { + callback.onResult(null, t); + } + }); } } @@ -200,24 +239,26 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee AtomicReference actualValue = new AtomicReference<>(); AtomicReference actualException = new AtomicReference<>(); - AtomicBoolean wasCalled = new AtomicBoolean(false); + CompletableFuture wasCalledFuture = new CompletableFuture<>(); try { async.accept((v, e) -> { actualValue.set(v); actualException.set(e); - if (wasCalled.get()) { + if (wasCalledFuture.isDone()) { fail(); } - wasCalled.set(true); + wasCalledFuture.complete(null); }); } catch (Throwable e) { fail("async threw instead of using callback"); } + await(wasCalledFuture, "Callback should have been called"); + // The following code can be used to debug variations: // System.out.println("===VARIATION START"); // System.out.println("sync: " + expectedEvents); -// System.out.println("callback called?: " + wasCalled.get()); +// System.out.println("callback called?: " + wasCalledFuture.isDone()); // System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get()); // System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get()); // System.out.println("exception mode: " + (isTestingAbruptCompletion @@ -229,7 +270,7 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee throw (AssertionFailedError) actualException.get(); } - assertTrue(wasCalled.get(), "callback should have been called"); + assertTrue(wasCalledFuture.isDone(), "callback should have been called"); assertEquals(expectedEvents, listener.getEventStrings(), "steps should have matched"); assertEquals(expectedValue, actualValue.get()); assertEquals(expectedException == null, actualException.get() == null, @@ -242,6 +283,14 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee listener.clear(); } + protected T await(final CompletableFuture voidCompletableFuture, final String errorMessage) { + try { + return voidCompletableFuture.get(1, TimeUnit.MINUTES); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new AssertionError(errorMessage); + } + } + /** * Tracks invocations: allows testing of all variations of a method calls */ diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java new file mode 100644 index 00000000000..85bd0831749 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@DisplayName("The same thread async functions") +public class SameThreadAsyncFunctionsTest extends AsyncFunctionsAbstractTest { + @Override + public ExecutorService createAsyncExecutor() { + return new AbstractExecutorService() { + @Override + public void execute(@NotNull final Runnable command) { + command.run(); + } + + @Override + public void shutdown() { + } + + @NotNull + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(final long timeout, @NotNull final TimeUnit unit) { + return true; + } + }; + } + + @Test + void testInvalid() { + setIsTestingAbruptCompletion(false); + setAsyncStep(true); + IllegalStateException illegalStateException = new IllegalStateException("must not cause second callback invocation"); + + assertThrows(IllegalStateException.class, () -> { + beginAsync().thenRun(c -> { + async(3, c); + throw illegalStateException; + }).finish((v, e) -> { + assertNotEquals(e, illegalStateException); + }); + }); + assertThrows(IllegalStateException.class, () -> { + beginAsync().thenRun(c -> { + async(3, c); + }).finish((v, e) -> { + throw illegalStateException; + }); + }); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java new file mode 100644 index 00000000000..6ee4aef5899 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisplayName("Separate thread async functions") +public class SeparateThreadAsyncFunctionsTest extends AsyncFunctionsAbstractTest { + + private UncaughtExceptionHandler uncaughtExceptionHandler; + + @Override + public ExecutorService createAsyncExecutor() { + uncaughtExceptionHandler = new UncaughtExceptionHandler(); + return Executors.newFixedThreadPool(1, r -> { + Thread thread = new Thread(r); + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); + return thread; + }); + } + + /** + * This test covers the scenario where a callback is erroneously invoked after a callback had been completed. + * Such behavior is considered a bug and is not expected. An AssertionError should be thrown if an asynchronous invocation + * attempts to use a callback that has already been marked as completed. + */ + @Test + void shouldPropagateAssertionErrorIfCallbackHasBeenCompletedAfterAsyncInvocation() { + //given + setIsTestingAbruptCompletion(false); + setAsyncStep(true); + IllegalStateException illegalStateException = new IllegalStateException("must not cause second callback invocation"); + + AtomicBoolean callbackInvoked = new AtomicBoolean(false); + CompletableFuture finalCallbackWasInvoked = new CompletableFuture<>(); + + //when + beginAsync().thenRun(c -> { + async(3, c); + throw illegalStateException; + }).thenRun(c -> { + assertInvokedOnce(callbackInvoked); + c.complete(c); + }) + .finish((v, e) -> { + assertEquals(illegalStateException, e); + finalCallbackWasInvoked.complete(null); + } + ); + + //then + Throwable exception = uncaughtExceptionHandler.getException(); + assertNotNull(exception); + assertEquals(AssertionError.class, exception.getClass()); + assertEquals("Callback has been already completed. It could happen " + + "if code throws an exception after invoking an async method.", exception.getMessage()); + } + + @Test + void shouldPropagateUnexpectedExceptionFromFinishCallback() { + //given + setIsTestingAbruptCompletion(false); + setAsyncStep(true); + IllegalStateException illegalStateException = new IllegalStateException("must not cause second callback invocation"); + CompletableFuture finalCallbackWasInvoked = new CompletableFuture<>(); + + //when + beginAsync().thenRun(c -> { + async(3, c); + }).finish((v, e) -> { + finalCallbackWasInvoked.complete(null); + throw illegalStateException; + }); + + //then + Throwable exception = uncaughtExceptionHandler.getException(); + assertNotNull(exception); + assertEquals(illegalStateException, exception); + } + + private static void assertInvokedOnce(final AtomicBoolean callbackInvoked1) { + assertTrue(callbackInvoked1.compareAndSet(false, true)); + } + + private final class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + + private final CompletableFuture completable = new CompletableFuture<>(); + + @Override + public void uncaughtException(final Thread t, final Throwable e) { + completable.complete(e); + } + + public Throwable getException() { + return await(completable, "No exception was thrown"); + } + } +} From 1ff1e6cd76e54c8e28ff2195f4170d4ee799a0f3 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 13 Aug 2024 20:25:05 -0700 Subject: [PATCH 2/8] Remove redundant code. JAVA-5562 --- .../mongodb/internal/async/AsyncFunction.java | 2 +- .../mongodb/internal/async/AsyncRunnable.java | 3 - .../mongodb/internal/async/AsyncSupplier.java | 2 +- .../async/AsyncFunctionsAbstractTest.java | 157 ++++++++---------- 4 files changed, 70 insertions(+), 94 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index 00fb7097d67..b1702597627 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -37,7 +37,7 @@ public interface AsyncFunction { void unsafeFinish(T value, SingleResultCallback callback); /** - * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. + * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. * * @param callback the callback provided by the method the chain is used in. */ diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index a1072d1bfea..d4ead3c5b96 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -169,9 +169,6 @@ default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultC */ default AsyncRunnable thenRun(final AsyncRunnable runnable) { return (c) -> { - /* This call of 'unsafeFinish' initiates the async chain and should be triggered from the thread that called 'finish()'. - However, the callback provided to 'unsafeFinish' may execute on a different thread, thus 'finish()' instead of 'unsafeFinish()' - must be used within callbacks. */ this.unsafeFinish((r, e) -> { if (e == null) { /* If 'runnable' is executed on a different thread from the one that executed the initial 'finish()', diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 3b61dcd876e..048d489e46a 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -55,7 +55,7 @@ default void unsafeFinish(@Nullable final Void value, final SingleResultCallback } /** - * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. + * Must be invoked at end of async chain or when executing a callback handler supplied by the caller. * * @see #thenApply(AsyncFunction) * @see #thenConsume(AsyncConsumer) diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 7892cbdf988..611b90fc675 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -19,6 +19,8 @@ import com.mongodb.internal.TimeoutSettings; import org.junit.jupiter.api.Test; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -27,29 +29,6 @@ abstract class AsyncFunctionsAbstractTest extends AsyncFunctionsTestBase { private static final TimeoutContext TIMEOUT_CONTEXT = new TimeoutContext(new TimeoutSettings(0, 0, 0, 0L, 0)); -// @Test -// void test(){ -// -// SingleResultCallback userCallback = (result, t) -> { -// if (t != null) { -// System.out.println("Error:" + t.getMessage()); -// } -// System.out.println("Result:" + result); -// }; -// -// beginAsync().thenRun(c -> { -// // Async imitation -// new Thread() { -// public void run() { -// c.complete(c); -// } -// }.start(); -// }).thenRun(callback -> { -// // Assume this is plain code -// throw new RuntimeException("Error adada"); -// }).finish(userCallback); -// } - @Test void test1Method() { // the number of expected variations is often: 1 + N methods invoked @@ -783,72 +762,72 @@ void testVariables() { @Test void testDerivation() { -// // Demonstrates the progression from nested async to the API. -// -// // Stand-ins for sync-async methods; these "happily" do not throw -// // exceptions, to avoid complicating this demo async code. -// Consumer happySync = (i) -> { -// getNextOption(1); -// listenerAdd("affected-success-" + i); -// }; -// BiConsumer> happyAsync = (i, c) -> { -// happySync.accept(i); -// c.complete(c); -// }; -// -// // Standard nested async, no error handling: -// assertBehavesSameVariations(1, -// () -> { -// happySync.accept(1); -// happySync.accept(2); -// }, -// (callback) -> { -// happyAsync.accept(1, (v, e) -> { -// happyAsync.accept(2, callback); -// }); -// }); -// -// // When both methods are naively extracted, they are out of order: -// assertBehavesSameVariations(1, -// () -> { -// happySync.accept(1); -// happySync.accept(2); -// }, -// (callback) -> { -// SingleResultCallback second = (v, e) -> { -// happyAsync.accept(2, callback); -// }; -// SingleResultCallback first = (v, e) -> { -// happyAsync.accept(1, second); -// }; -// first.onResult(null, null); -// }); -// -// // We create an "AsyncRunnable" that takes a callback, which -// // decouples any async methods from each other, allowing them -// // to be declared in a sync-like order, and without nesting: -// assertBehavesSameVariations(1, -// () -> { -// happySync.accept(1); -// happySync.accept(2); -// }, -// (callback) -> { -// AsyncRunnable first = (SingleResultCallback c) -> { -// happyAsync.accept(1, c); -// }; -// AsyncRunnable second = (SingleResultCallback c) -> { -// happyAsync.accept(2, c); -// }; -// // This is a simplified variant of the "then" methods; -// // it has no error handling. It takes methods A and B, -// // and returns C, which is B(A()). -// AsyncRunnable combined = (c) -> { -// first.unsafeFinish((r, e) -> { -// second.unsafeFinish(c); -// }); -// }; -// combined.unsafeFinish(callback); -// }); + // Demonstrates the progression from nested async to the API. + + // Stand-ins for sync-async methods; these "happily" do not throw + // exceptions, to avoid complicating this demo async code. + Consumer happySync = (i) -> { + getNextOption(1); + listenerAdd("affected-success-" + i); + }; + BiConsumer> happyAsync = (i, c) -> { + happySync.accept(i); + c.complete(c); + }; + + // Standard nested async, no error handling: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + happyAsync.accept(1, (v, e) -> { + happyAsync.accept(2, callback); + }); + }); + + // When both methods are naively extracted, they are out of order: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + SingleResultCallback second = (v, e) -> { + happyAsync.accept(2, callback); + }; + SingleResultCallback first = (v, e) -> { + happyAsync.accept(1, second); + }; + first.onResult(null, null); + }); + + // We create an "AsyncRunnable" that takes a callback, which + // decouples any async methods from each other, allowing them + // to be declared in a sync-like order, and without nesting: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + AsyncRunnable first = (SingleResultCallback c) -> { + happyAsync.accept(1, c); + }; + AsyncRunnable second = (SingleResultCallback c) -> { + happyAsync.accept(2, c); + }; + // This is a simplified variant of the "then" methods; + // it has no error handling. It takes methods A and B, + // and returns C, which is B(A()). + AsyncRunnable combined = (c) -> { + first.unsafeFinish((r, e) -> { + second.unsafeFinish(c); + }); + }; + combined.unsafeFinish(callback); + }); // This combining method is added as a default method on AsyncRunnable, // and a "finish" method wraps the resulting methods. This also adds From d2312bd56f783c5f8f237aa56a012d4065e94b42 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 13 Aug 2024 20:27:55 -0700 Subject: [PATCH 3/8] Make an inner class. JAVA-5562 --- .../async/SameThreadAsyncFunctionsTest.java | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java index 85bd0831749..04b9290af55 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/SameThreadAsyncFunctionsTest.java @@ -34,37 +34,7 @@ public class SameThreadAsyncFunctionsTest extends AsyncFunctionsAbstractTest { @Override public ExecutorService createAsyncExecutor() { - return new AbstractExecutorService() { - @Override - public void execute(@NotNull final Runnable command) { - command.run(); - } - - @Override - public void shutdown() { - } - - @NotNull - @Override - public List shutdownNow() { - return Collections.emptyList(); - } - - @Override - public boolean isShutdown() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isTerminated() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean awaitTermination(final long timeout, @NotNull final TimeUnit unit) { - return true; - } - }; + return new SameThreadExecutorService(); } @Test @@ -89,4 +59,36 @@ void testInvalid() { }); }); } + + private static class SameThreadExecutorService extends AbstractExecutorService { + @Override + public void execute(@NotNull final Runnable command) { + command.run(); + } + + @Override + public void shutdown() { + } + + @NotNull + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(final long timeout, @NotNull final TimeUnit unit) { + return true; + } + } } From 9bcbe2b0245ac241a78513fcd1f39f987ada6221 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 13 Aug 2024 20:29:25 -0700 Subject: [PATCH 4/8] Remove redundant code. JAVA-5562 --- .../internal/async/SeparateThreadAsyncFunctionsTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java index 6ee4aef5899..6b90a0415d0 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java @@ -55,9 +55,7 @@ void shouldPropagateAssertionErrorIfCallbackHasBeenCompletedAfterAsyncInvocation setIsTestingAbruptCompletion(false); setAsyncStep(true); IllegalStateException illegalStateException = new IllegalStateException("must not cause second callback invocation"); - AtomicBoolean callbackInvoked = new AtomicBoolean(false); - CompletableFuture finalCallbackWasInvoked = new CompletableFuture<>(); //when beginAsync().thenRun(c -> { @@ -69,7 +67,6 @@ void shouldPropagateAssertionErrorIfCallbackHasBeenCompletedAfterAsyncInvocation }) .finish((v, e) -> { assertEquals(illegalStateException, e); - finalCallbackWasInvoked.complete(null); } ); @@ -87,13 +84,11 @@ void shouldPropagateUnexpectedExceptionFromFinishCallback() { setIsTestingAbruptCompletion(false); setAsyncStep(true); IllegalStateException illegalStateException = new IllegalStateException("must not cause second callback invocation"); - CompletableFuture finalCallbackWasInvoked = new CompletableFuture<>(); //when beginAsync().thenRun(c -> { async(3, c); }).finish((v, e) -> { - finalCallbackWasInvoked.complete(null); throw illegalStateException; }); From 401a8605f8edbbeed7d0f7dbebc2f41b10cc05c2 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Thu, 15 Aug 2024 16:21:19 -0700 Subject: [PATCH 5/8] Add value and exception to AssertionError. JAVA-5562 --- .../src/main/com/mongodb/internal/async/AsyncFunction.java | 4 ++-- .../src/main/com/mongodb/internal/async/AsyncSupplier.java | 4 ++-- .../internal/async/SeparateThreadAsyncFunctionsTest.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index b1702597627..7203d3a4945 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -46,8 +46,8 @@ default void finish(final T value, final SingleResultCallback callback) { try { this.unsafeFinish(value, (v, e) -> { if (!callbackInvoked.compareAndSet(false, true)) { - throw new AssertionError("Callback has been already completed. It could happen " - + "if code throws an exception after invoking an async method."); + throw new AssertionError(String.format("Callback has been already completed. It could happen " + + "if code throws an exception after invoking an async method. Value: %s", v), e); } callback.onResult(v, e); }); diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 048d489e46a..77c289c8723 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -67,8 +67,8 @@ default void finish(final SingleResultCallback callback) { try { this.unsafeFinish((v, e) -> { if (!callbackInvoked.compareAndSet(false, true)) { - throw new AssertionError("Callback has been already completed. It could happen " - + "if code throws an exception after invoking an async method."); + throw new AssertionError(String.format("Callback has been already completed. It could happen " + + "if code throws an exception after invoking an async method. Value: %s", v), e); } callback.onResult(v, e); }); diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java index 6b90a0415d0..401c4d2c18e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/SeparateThreadAsyncFunctionsTest.java @@ -75,7 +75,7 @@ void shouldPropagateAssertionErrorIfCallbackHasBeenCompletedAfterAsyncInvocation assertNotNull(exception); assertEquals(AssertionError.class, exception.getClass()); assertEquals("Callback has been already completed. It could happen " - + "if code throws an exception after invoking an async method.", exception.getMessage()); + + "if code throws an exception after invoking an async method. Value: null", exception.getMessage()); } @Test From 367124772853ed1b0df60cfe08fa5d512302a8e3 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 16 Aug 2024 16:56:02 -0700 Subject: [PATCH 6/8] Ensure local exception handling in async callback Modify the async callback to catch and handle exceptions locally. Exceptions are now directly processed and passed as an error argument to the callback function, avoiding propagation to the parent callback. JAVA-5562 --- .../connection/InternalStreamConnectionInitializer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index ee509873e40..aa0a61a3d1d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -101,7 +101,13 @@ public void startHandshakeAsync(final InternalConnection internalConnection, fin callback.onResult(null, t instanceof MongoException ? mapHelloException((MongoException) t) : t); } else { setSpeculativeAuthenticateResponse(helloResult); - callback.onResult(createInitializationDescription(helloResult, internalConnection, startTime), null); + try { + InternalConnectionInitializationDescription initializationDescription = + createInitializationDescription(helloResult, internalConnection, startTime); + callback.onResult(initializationDescription, null); + } catch (Throwable e) { + callback.onResult(null, e); + } } }); } From 477308074a718b0cf92bd07e4c61fec31a862328 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 16 Aug 2024 17:13:40 -0700 Subject: [PATCH 7/8] Move `callback.onResult` outside the catch block to ensure it's not invoked twice when an exception occurs. JAVA-5562 --- .../internal/connection/InternalStreamConnection.java | 6 ++++-- .../InternalStreamConnectionInitializer.java | 11 ++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 98e43fe5fbe..de12e5f092f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -610,6 +610,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d return; } assertNotNull(responseBuffers); + T commandResult; try { updateSessionContext(operationContext.getSessionContext(), responseBuffers); boolean commandOk = @@ -624,13 +625,14 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d } commandEventSender.sendSucceededEvent(responseBuffers); - T result1 = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); - callback.onResult(result1, null); + commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); } catch (Throwable localThrowable) { callback.onResult(null, localThrowable); + return; } finally { responseBuffers.close(); } + callback.onResult(commandResult, null); })); } }); diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index aa0a61a3d1d..0aae1440a65 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -101,13 +101,14 @@ public void startHandshakeAsync(final InternalConnection internalConnection, fin callback.onResult(null, t instanceof MongoException ? mapHelloException((MongoException) t) : t); } else { setSpeculativeAuthenticateResponse(helloResult); + InternalConnectionInitializationDescription initializationDescription; try { - InternalConnectionInitializationDescription initializationDescription = - createInitializationDescription(helloResult, internalConnection, startTime); - callback.onResult(initializationDescription, null); - } catch (Throwable e) { - callback.onResult(null, e); + initializationDescription = createInitializationDescription(helloResult, internalConnection, startTime); + } catch (Throwable localException) { + callback.onResult(null, localException); + return; } + callback.onResult(initializationDescription, null); } }); } From b566c00c52fc8b0d9a2546e53f0c2184e9b91f9b Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 16 Aug 2024 17:15:03 -0700 Subject: [PATCH 8/8] rename variable. JAVA-5562 --- .../connection/InternalStreamConnectionInitializer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 0aae1440a65..6fca357b080 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -104,8 +104,8 @@ public void startHandshakeAsync(final InternalConnection internalConnection, fin InternalConnectionInitializationDescription initializationDescription; try { initializationDescription = createInitializationDescription(helloResult, internalConnection, startTime); - } catch (Throwable localException) { - callback.onResult(null, localException); + } catch (Throwable localThrowable) { + callback.onResult(null, localThrowable); return; } callback.onResult(initializationDescription, null);