diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index c5cd3491ec8..bac2a86e61d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -45,6 +45,7 @@ import static com.mongodb.ReadPreference.primary; import static com.mongodb.ReadPreference.primaryPreferred; import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.connection.ClusterConnectionMode.LOAD_BALANCED; import static com.mongodb.connection.ClusterConnectionMode.SINGLE; @@ -112,6 +113,7 @@ public final class CommandMessage extends RequestMessage { this.payloadFieldNameValidator = payloadFieldNameValidator; this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode); this.serverApi = serverApi; + assertTrue(useOpMsg() || responseExpected); } /** @@ -187,7 +189,11 @@ private String getSequenceIdentifier(final ByteBuf byteBuf) { } boolean isResponseExpected() { - return !useOpMsg() || requireOpMsgResponse(); + if (responseExpected) { + return true; + } else { + return payload != null && payload.isOrdered() && payload.hasAnotherSplit(); + } } MongoNamespace getNamespace() { @@ -240,7 +246,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu private int getOpMsgFlagBits() { int flagBits = 0; - if (!requireOpMsgResponse()) { + if (!isResponseExpected()) { flagBits = 1 << 1; } if (exhaustAllowed) { @@ -249,14 +255,6 @@ private int getOpMsgFlagBits() { return flagBits; } - private boolean requireOpMsgResponse() { - if (responseExpected) { - return true; - } else { - return payload != null && payload.hasAnotherSplit(); - } - } - private boolean isDirectConnectionToReplicaSetMember() { return clusterConnectionMode == SINGLE && getSettings().getServerType() != SHARD_ROUTER diff --git a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java index a71f7a940f0..8539a2074ee 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.connection.SplittablePayload.Type.INSERT; @@ -57,6 +58,7 @@ public final class SplittablePayload { private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder(); private final Type payloadType; private final List writeRequestWithIndexes; + private final boolean ordered; private final Map insertedIds = new HashMap<>(); private int position = 0; @@ -91,9 +93,10 @@ public enum Type { * @param payloadType the payload type * @param writeRequestWithIndexes the writeRequests */ - public SplittablePayload(final Type payloadType, final List writeRequestWithIndexes) { + public SplittablePayload(final Type payloadType, final List writeRequestWithIndexes, final boolean ordered) { this.payloadType = notNull("batchType", payloadType); this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes); + this.ordered = ordered; } /** @@ -117,7 +120,7 @@ public String getPayloadName() { } boolean hasPayload() { - return writeRequestWithIndexes.size() > 0; + return !writeRequestWithIndexes.isEmpty(); } public int size() { @@ -137,10 +140,6 @@ public List getPayload() { .collect(Collectors.toList()); } - public List getWriteRequestWithIndexes() { - return writeRequestWithIndexes; - } - /** * @return the current position in the payload */ @@ -160,16 +159,22 @@ public void setPosition(final int position) { * @return true if there are more values after the current position */ public boolean hasAnotherSplit() { + // this method must be not called before this payload having been encoded + assertTrue(position > 0); return writeRequestWithIndexes.size() > position; } + boolean isOrdered() { + return ordered; + } + /** * @return a new SplittablePayload containing only the values after the current position. */ public SplittablePayload getNextSplit() { isTrue("hasAnotherSplit", hasAnotherSplit()); List nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size()); - return new SplittablePayload(payloadType, nextPayLoad); + return new SplittablePayload(payloadType, nextPayLoad, ordered); } /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java b/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java index b5d36934605..1bca4734eff 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java @@ -17,7 +17,6 @@ package com.mongodb.internal.operation; import com.mongodb.MongoBulkWriteException; -import com.mongodb.MongoClientException; import com.mongodb.MongoInternalException; import com.mongodb.MongoNamespace; import com.mongodb.WriteConcern; @@ -65,6 +64,7 @@ import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE; import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; +import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError; @@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace, final List writeRequests, final OperationContext operationContext, @Nullable final BsonValue comment, @Nullable final BsonDocument variables) { - SessionContext sessionContext = operationContext.getSessionContext(); - if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction() - && !writeConcern.isAcknowledged()) { - throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session"); - } - boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext); + boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext()); List writeRequestsWithIndex = new ArrayList<>(); boolean writeRequestsAreRetryable = true; for (int i = 0; i < writeRequests.size(); i++) { @@ -159,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti this.indexMap = indexMap; this.unprocessed = unprocessedItems; - this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems); + this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered); this.operationContext = operationContext; this.comment = comment; this.variables = variables; @@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti if (!payloadItems.isEmpty()) { command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName())); command.put("ordered", new BsonBoolean(ordered)); - if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) { - command.put("writeConcern", writeConcern.asDocument()); - } + commandWriteConcern(writeConcern, sessionContext).ifPresent(value -> + command.put("writeConcern", value.asDocument())); if (bypassDocumentValidation != null) { command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation)); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index 398925511e0..a32ce6d5153 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -16,6 +16,7 @@ package com.mongodb.internal.operation; +import com.mongodb.MongoClientException; import com.mongodb.MongoException; import com.mongodb.MongoNamespace; import com.mongodb.WriteConcern; @@ -191,8 +192,8 @@ public BulkWriteResult execute(final WriteBinding binding) { // attach `maxWireVersion` ASAP because it is used to check whether we can retry retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); SessionContext sessionContext = binding.getOperationContext().getSessionContext(); - WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); - if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) { + WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); + if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) { handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext); } validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern); @@ -201,7 +202,7 @@ public BulkWriteResult execute(final WriteBinding binding) { connectionDescription, ordered, writeConcern, bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext); } - return executeBulkWriteBatch(retryState, binding, connection); + return executeBulkWriteBatch(retryState, writeConcern, binding, connection); }) ); try { @@ -226,8 +227,8 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall // attach `maxWireVersion` ASAP because it is used to check whether we can retry retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); SessionContext sessionContext = binding.getOperationContext().getSessionContext(); - WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); - if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext) + WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); + if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext) && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContext)) { return; } @@ -245,13 +246,17 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba releasingCallback.onResult(null, t); return; } - executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback); + executeBulkWriteBatchAsync(retryState, writeConcern, binding, connection, releasingCallback); }) ).whenComplete(binding::release); retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER))); } - private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) { + private BulkWriteResult executeBulkWriteBatch( + final RetryState retryState, + final WriteConcern effectiveWriteConcern, + final WriteBinding binding, + final Connection connection) { BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) .orElseThrow(Assertions::fail); BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); @@ -261,7 +266,7 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final while (currentBatch.shouldProcessBatch()) { try { - BsonDocument result = executeCommand(operationContext, connection, currentBatch); + BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch); if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, connection.getDescription().getServerAddress(), "errMsg", timeoutContext); @@ -295,7 +300,11 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final } } - private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection, + private void executeBulkWriteBatchAsync( + final RetryState retryState, + final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, + final AsyncConnection connection, final SingleResultCallback callback) { LoopState loopState = new LoopState(); AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> { @@ -309,7 +318,7 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async } OperationContext operationContext = binding.getOperationContext(); TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - executeCommandAsync(operationContext, connection, currentBatch, (result, t) -> { + executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> { if (t == null) { if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, @@ -405,31 +414,47 @@ private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetrySta } @Nullable - private BsonDocument executeCommand(final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) { + private BsonDocument executeCommand( + final WriteConcern effectiveWriteConcern, + final OperationContext operationContext, + final Connection connection, + final BulkWriteBatch batch) { return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()), + operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), batch.getFieldNameValidator()); } - private void executeCommandAsync(final OperationContext operationContext, final AsyncConnection connection, final BulkWriteBatch batch, + private void executeCommandAsync( + final WriteConcern effectiveWriteConcern, + final OperationContext operationContext, + final AsyncConnection connection, + final BulkWriteBatch batch, final SingleResultCallback callback) { connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()), + operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), batch.getFieldNameValidator(), callback); } - private WriteConcern getAppliedWriteConcern(final SessionContext sessionContext) { - if (sessionContext.hasActiveTransaction()) { - return WriteConcern.ACKNOWLEDGED; - } else { - return writeConcern; + private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext) + throws MongoClientException { + boolean activeTransaction = sessionContext.hasActiveTransaction(); + WriteConcern effectiveWriteConcern = activeTransaction + ? WriteConcern.ACKNOWLEDGED + : writeConcernSetting; + if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) { + throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session"); } + return effectiveWriteConcern; } - private boolean shouldAcknowledge(final BulkWriteBatch batch, final SessionContext sessionContext) { - return ordered - ? batch.hasAnotherBatch() || getAppliedWriteConcern(sessionContext).isAcknowledged() - : getAppliedWriteConcern(sessionContext).isAcknowledged(); + static Optional commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) { + return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction() + ? Optional.empty() + : Optional.of(effectiveWriteConcern); + } + + private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) { + return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch()); } private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set errorLabels) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy index 8c10755cca8..e8ed6c152ae 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy @@ -172,7 +172,7 @@ class CommandMessageSpecification extends Specification { new BsonDocument('insert', new BsonString('coll')), new SplittablePayload(INSERT, [new BsonDocument('_id', new BsonInt32(1)), new BsonDocument('_id', new BsonInt32(2))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) } ), + .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true), ], [ LATEST_WIRE_VERSION, @@ -193,7 +193,7 @@ class CommandMessageSpecification extends Specification { new BsonDocument('_id', new BsonInt32(3)).append('c', new BsonBinary(new byte[450])), new BsonDocument('_id', new BsonInt32(4)).append('b', new BsonBinary(new byte[441])), new BsonDocument('_id', new BsonInt32(5)).append('c', new BsonBinary(new byte[451]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) } ) + .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true) def message = new CommandMessage(namespace, insertCommand, fieldNameValidator, ReadPreference.primary(), messageSettings, false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE, null) def output = new BasicOutputBuffer() @@ -280,7 +280,7 @@ class CommandMessageSpecification extends Specification { def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900])), new BsonDocument('b', new BsonBinary(new byte[450])), new BsonDocument('c', new BsonBinary(new byte[450]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) } ) + .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true) def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings, false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE, null) def output = new BasicOutputBuffer() @@ -328,7 +328,7 @@ class CommandMessageSpecification extends Specification { def messageSettings = MessageSettings.builder().maxDocumentSize(900) .maxWireVersion(LATEST_WIRE_VERSION).build() def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }) + .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true) def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings, false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE, null) def output = new BasicOutputBuffer() @@ -348,7 +348,7 @@ class CommandMessageSpecification extends Specification { given: def messageSettings = MessageSettings.builder().serverType(ServerType.SHARD_ROUTER) .maxWireVersion(FOUR_DOT_ZERO_WIRE_VERSION).build() - def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))]) + def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))], true) def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings, false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE, null) def output = new BasicOutputBuffer() diff --git a/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy b/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy index f5eead4cdfc..fc688fec5df 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy +++ b/driver-sync/src/test/functional/com/mongodb/client/MongoClientSessionSpecification.groovy @@ -26,6 +26,7 @@ import com.mongodb.WriteConcern import com.mongodb.client.model.Filters import com.mongodb.event.CommandStartedEvent import com.mongodb.internal.connection.TestCommandListener +import com.mongodb.internal.time.Timeout import org.bson.BsonBinarySubType import org.bson.BsonDocument import org.bson.BsonInt32 @@ -350,9 +351,11 @@ class MongoClientSessionSpecification extends FunctionalSpecification { void waitForInsertAcknowledgement(MongoCollection collection, ObjectId id) { Document document = collection.find(Filters.eq(id)).first() + Timeout timeout = Timeout.expiresIn(5, TimeUnit.SECONDS, Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE) while (document == null) { Thread.sleep(1) document = collection.find(Filters.eq(id)).first() + timeout.onExpired { assert !"Timed out waiting for insert acknowledgement".trim() } } } } diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy index eb7d51622a3..8293b6a1599 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy @@ -115,7 +115,7 @@ class CryptConnectionSpecification extends Specification { def payload = new SplittablePayload(INSERT, [ new BsonDocumentWrapper(new Document('_id', 1).append('ssid', '555-55-5555').append('b', bytes), codec), new BsonDocumentWrapper(new Document('_id', 2).append('ssid', '666-66-6666').append('b', bytes), codec) - ].withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }) + ].withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true) def encryptedCommand = toRaw(new BsonDocument('insert', new BsonString('test')).append('documents', new BsonArray( [ new BsonDocument('_id', new BsonInt32(1)) @@ -172,7 +172,7 @@ class CryptConnectionSpecification extends Specification { new BsonDocumentWrapper(new Document('_id', 1), codec), new BsonDocumentWrapper(new Document('_id', 2), codec), new BsonDocumentWrapper(new Document('_id', 3), codec) - ].withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }) + ].withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true) def encryptedCommand = toRaw(new BsonDocument('insert', new BsonString('test')).append('documents', new BsonArray( [ new BsonDocument('_id', new BsonInt32(1)),