diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java index 1985e41a5ce..eb0f8e6817d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java @@ -163,7 +163,9 @@ static Throwable chooseRetryableWriteException( return mostRecentAttemptException.getCause(); } return mostRecentAttemptException; - } else if (mostRecentAttemptException instanceof ResourceSupplierInternalException) { + } else if (mostRecentAttemptException instanceof ResourceSupplierInternalException + || (mostRecentAttemptException instanceof MongoException + && ((MongoException) mostRecentAttemptException).hasErrorLabel(NO_WRITES_PERFORMED_ERROR_LABEL))) { return previouslyChosenException; } else { return mostRecentAttemptException; @@ -571,6 +573,7 @@ private static boolean isRetryWritesEnabled(@Nullable final BsonDocument command } static final String RETRYABLE_WRITE_ERROR_LABEL = "RetryableWriteError"; + private static final String NO_WRITES_PERFORMED_ERROR_LABEL = "NoWritesPerformed"; private static boolean decideRetryableAndAddRetryableWriteErrorLabel(final Throwable t, @Nullable final Integer maxWireVersion) { if (!(t instanceof MongoException)) { diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy index 685e7db1963..543a4a00e46 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy @@ -846,14 +846,14 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat given: getCollectionHelper().insertDocuments(getTestInserts()) def operation = new MixedBulkWriteOperation(getNamespace(), - [new InsertRequest(new BsonDocument('_id', new BsonInt32(7))), - new InsertRequest(new BsonDocument('_id', new BsonInt32(1))) // duplicate key - ], false, ACKNOWLEDGED, true) + [new DeleteRequest(new BsonDocument('_id', new BsonInt32(2))), // existing key + new InsertRequest(new BsonDocument('_id', new BsonInt32(1))) // existing (duplicate) key + ], true, ACKNOWLEDGED, true) def failPoint = BsonDocument.parse('''{ "configureFailPoint": "failCommand", "mode": {"times": 2 }, - "data": { "failCommands": ["insert"], + "data": { "failCommands": ["delete"], "writeConcernError": {"code": 91, "errmsg": "Replication is being shut down"}}}''') configureFailPoint(failPoint) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 904137a06dc..5b38c8fbc47 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -98,6 +98,15 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } + /** + * Prose test #3. + */ + @Test + public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed( + mongoClientSettings -> new SyncMongoClient(MongoClients.create(mongoClientSettings))); + } + private boolean canRunTests() { Document storageEngine = (Document) getServerStatus().get("storageEngine"); diff --git a/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java b/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java index bc88b8eab33..52e8fe9ff58 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java +++ b/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java @@ -29,17 +29,17 @@ public final class FailPoint implements AutoCloseable { private final BsonDocument failPointDocument; private final MongoClient client; - private final boolean close; - private FailPoint(final BsonDocument failPointDocument, final MongoClient client, final boolean close) { + private FailPoint(final BsonDocument failPointDocument, final MongoClient client) { this.failPointDocument = failPointDocument.toBsonDocument(); this.client = client; - this.close = close; } /** * @param configureFailPointDoc A document representing {@code configureFailPoint} command to be issued as is via * {@link com.mongodb.client.MongoDatabase#runCommand(Bson)}. + * @param serverAddress One may use {@link Fixture#getPrimary()} to get the address of a primary server + * if that is what is needed. */ public static FailPoint enable(final BsonDocument configureFailPointDoc, final ServerAddress serverAddress) { MongoClientSettings clientSettings = getMongoClientSettingsBuilder() @@ -48,18 +48,11 @@ public static FailPoint enable(final BsonDocument configureFailPointDoc, final S .hosts(Collections.singletonList(serverAddress))) .build(); MongoClient client = MongoClients.create(clientSettings); - return enable(configureFailPointDoc, client, true); + return enable(configureFailPointDoc, client); } - /** - * @see #enable(BsonDocument, ServerAddress) - */ - public static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client) { - return enable(configureFailPointDoc, client, false); - } - - private static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client, final boolean close) { - FailPoint result = new FailPoint(configureFailPointDoc, client, close); + private static FailPoint enable(final BsonDocument configureFailPointDoc, final MongoClient client) { + FailPoint result = new FailPoint(configureFailPointDoc, client); client.getDatabase("admin").runCommand(configureFailPointDoc); return result; } @@ -71,9 +64,7 @@ public void close() { .append("configureFailPoint", failPointDocument.getString("configureFailPoint")) .append("mode", new BsonString("off"))); } finally { - if (close) { - client.close(); - } + client.close(); } } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/Fixture.java b/driver-sync/src/test/functional/com/mongodb/client/Fixture.java index 468dddb617b..3b0d45dca88 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/Fixture.java +++ b/driver-sync/src/test/functional/com/mongodb/client/Fixture.java @@ -19,7 +19,6 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.ServerAddress; -import com.mongodb.client.internal.MongoClientImpl; import com.mongodb.connection.ServerDescription; import java.util.List; @@ -109,12 +108,16 @@ public static MongoClientSettings.Builder getMongoClientSettings(final Connectio return builder; } + /** + * Beware of a potential race condition hiding here: the primary you discover may differ from the one used by the {@code client} + * when performing some operations, as the primary may change. + */ public static ServerAddress getPrimary() throws InterruptedException { - getMongoClient(); - List serverDescriptions = getPrimaries(((MongoClientImpl) mongoClient).getCluster().getDescription()); + MongoClient client = getMongoClient(); + List serverDescriptions = getPrimaries(client.getClusterDescription()); while (serverDescriptions.isEmpty()) { Thread.sleep(100); - serverDescriptions = getPrimaries(((MongoClientImpl) mongoClient).getCluster().getDescription()); + serverDescriptions = getPrimaries(client.getClusterDescription()); } return serverDescriptions.get(0).getAddress(); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 4944a9c3856..0d7e9cd2019 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -20,6 +20,10 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; import com.mongodb.MongoException; +import com.mongodb.ServerAddress; +import com.mongodb.assertions.Assertions; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandSucceededEvent; import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionCheckedOutEvent; import com.mongodb.event.ConnectionPoolClearedEvent; @@ -34,12 +38,16 @@ import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getServerStatus; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; @@ -55,6 +63,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; @@ -138,7 +147,6 @@ public static void poolClearedExceptionMustBeRetryable( * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server * (while it waits for a response, calling `ServerMonitor.connect` has no effect). * Thus, we want to use small heartbeat delay to reduce delays in the test. */ - .minHeartbeatFrequency(50, TimeUnit.MILLISECONDS) .heartbeatFrequency(50, TimeUnit.MILLISECONDS)) .retryReads(true) .retryWrites(true) @@ -158,7 +166,7 @@ public static void poolClearedExceptionMustBeRetryable( .append("blockTimeMS", new BsonInt32(1000))); int timeoutSeconds = 5; try (MongoClient client = clientCreator.apply(clientSettings); - FailPoint ignored = FailPoint.enable(configureFailPoint, client)) { + FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) { MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) .getCollection("poolClearedExceptionMustBeRetryable"); collection.drop(); @@ -179,6 +187,77 @@ public static void poolClearedExceptionMustBeRetryable( } } + /** + * Prose test #3. + */ + @Test + public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create); + } + + @SuppressWarnings("try") + public static void originalErrorMustBePropagatedIfNoWritesPerformed( + final Function clientCreator) throws InterruptedException { + assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); + ServerAddress primaryServerAddress = Fixture.getPrimary(); + CompletableFuture futureFailPointFromListener = new CompletableFuture<>(); + CommandListener commandListener = new CommandListener() { + private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); + + @Override + public void commandSucceeded(final CommandSucceededEvent event) { + if (event.getCommandName().equals("insert") + && event.getResponse().getDocument("writeConcernError", new BsonDocument()) + .getInt32("code", new BsonInt32(-1)).intValue() == 91 + && configureFailPoint.compareAndSet(true, false)) { + Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable( + new BsonDocument() + .append("configureFailPoint", new BsonString("failCommand")) + .append("mode", new BsonDocument() + .append("times", new BsonInt32(1))) + .append("data", new BsonDocument() + .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) + .append("errorCode", new BsonInt32(10107)) + .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed") + .map(BsonString::new).collect(Collectors.toList())))), + primaryServerAddress + ))); + } + } + }; + BsonDocument failPointDocument = new BsonDocument() + .append("configureFailPoint", new BsonString("failCommand")) + .append("mode", new BsonDocument() + .append("times", new BsonInt32(1))) + .append("data", new BsonDocument() + .append("writeConcernError", new BsonDocument() + .append("code", new BsonInt32(91)) + .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError") + .map(BsonString::new).collect(Collectors.toList())))) + .append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))); + try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .applyToServerSettings(builder -> + // see `poolClearedExceptionMustBeRetryable` for the explanation + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) + .build()); + FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) { + MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) + .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed"); + collection.drop(); + try { + collection.insertOne(new Document()); + } catch (MongoException e) { + assertEquals(e.getCode(), 91); + return; + } + fail("must not reach"); + } finally { + futureFailPointFromListener.thenAccept(FailPoint::close); + } + } + private boolean canRunTests() { Document storageEngine = (Document) getServerStatus().get("storageEngine");