diff --git a/driver-core/src/main/com/mongodb/connection/Stream.java b/driver-core/src/main/com/mongodb/connection/Stream.java index 9c8a3a03d20..b129b2b41c8 100644 --- a/driver-core/src/main/com/mongodb/connection/Stream.java +++ b/driver-core/src/main/com/mongodb/connection/Stream.java @@ -17,6 +17,7 @@ package com.mongodb.connection; import com.mongodb.ServerAddress; +import com.mongodb.internal.connection.OperationContext; import org.bson.ByteBuf; import java.io.IOException; @@ -36,14 +37,15 @@ public interface Stream extends BufferProvider{ * * @throws IOException if an I/O error occurs */ - void open() throws IOException; + void open(OperationContext operationContext) throws IOException; /** * Open the stream asynchronously. * - * @param handler the completion handler for opening the stream + * @param operationContext + * @param handler the completion handler for opening the stream */ - void openAsync(AsyncCompletionHandler handler); + void openAsync(OperationContext operationContext, AsyncCompletionHandler handler); /** * Write each buffer in the list to the stream in order, blocking until all are completely written. diff --git a/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java index 90bc987272f..2a98d7c89cc 100644 --- a/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java @@ -21,6 +21,7 @@ import com.mongodb.ServerAddress; import com.mongodb.internal.connection.AsynchronousChannelStream; import com.mongodb.internal.connection.ExtendedAsynchronousByteChannel; +import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.connection.PowerOfTwoBufferPool; import com.mongodb.internal.connection.tlschannel.BufferAllocator; import com.mongodb.internal.connection.tlschannel.ClientTlsChannel; @@ -201,7 +202,7 @@ public boolean supportsAdditionalTimeout() { @SuppressWarnings("deprecation") @Override - public void openAsync(final AsyncCompletionHandler handler) { + public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler handler) { isTrue("unopened", getChannel() == null); try { SocketChannel socketChannel = SocketChannel.open(); diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index bb971603ab5..c09d96d727b 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -28,6 +28,7 @@ import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; import com.mongodb.connection.Stream; +import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.connection.netty.NettyByteBuf; import com.mongodb.lang.Nullable; import io.netty.bootstrap.Bootstrap; @@ -156,15 +157,15 @@ public ByteBuf getBuffer(final int size) { } @Override - public void open() throws IOException { + public void open(final OperationContext operationContext) throws IOException { FutureAsyncCompletionHandler handler = new FutureAsyncCompletionHandler<>(); - openAsync(handler); + openAsync(operationContext, handler); handler.get(); } @SuppressWarnings("deprecation") @Override - public void openAsync(final AsyncCompletionHandler handler) { + public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler handler) { Queue socketAddressQueue; try { @@ -174,10 +175,11 @@ public void openAsync(final AsyncCompletionHandler handler) { return; } - initializeChannel(handler, socketAddressQueue); + initializeChannel(operationContext, handler, socketAddressQueue); } - private void initializeChannel(final AsyncCompletionHandler handler, final Queue socketAddressQueue) { + private void initializeChannel(final OperationContext operationContext, final AsyncCompletionHandler handler, + final Queue socketAddressQueue) { if (socketAddressQueue.isEmpty()) { handler.failed(new MongoSocketException("Exception opening socket", getAddress())); } else { @@ -186,8 +188,8 @@ private void initializeChannel(final AsyncCompletionHandler handler, final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(socketChannelClass); - - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeout(MILLISECONDS)); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, + operationContext.getTimeoutContext().getConnectTimeoutMs()); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); @@ -221,7 +223,7 @@ public void initChannel(final SocketChannel ch) { } }); ChannelFuture channelFuture = bootstrap.connect(nextAddress); - channelFuture.addListener(new OpenChannelFutureListener(socketAddressQueue, channelFuture, handler)); + channelFuture.addListener(new OpenChannelFutureListener(operationContext, socketAddressQueue, channelFuture, handler)); } } @@ -503,9 +505,12 @@ private class OpenChannelFutureListener implements ChannelFutureListener { private final Queue socketAddressQueue; private final ChannelFuture channelFuture; private final AsyncCompletionHandler handler; + private final OperationContext operationContext; - OpenChannelFutureListener(final Queue socketAddressQueue, final ChannelFuture channelFuture, - final AsyncCompletionHandler handler) { + OpenChannelFutureListener(final OperationContext operationContext, + final Queue socketAddressQueue, final ChannelFuture channelFuture, + final AsyncCompletionHandler handler) { + this.operationContext = operationContext; this.socketAddressQueue = socketAddressQueue; this.channelFuture = channelFuture; this.handler = handler; @@ -528,7 +533,7 @@ public void operationComplete(final ChannelFuture future) { } else if (socketAddressQueue.isEmpty()) { handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), future.cause())); } else { - initializeChannel(handler, socketAddressQueue); + initializeChannel(operationContext, handler, socketAddressQueue); } } }); diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 0336f698d8f..4c688e7f08b 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -158,4 +158,13 @@ public Timeout startServerSelectionTimeout() { long ms = getTimeoutSettings().getServerSelectionTimeoutMS(); return StartTime.now().timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); } + + public Timeout startWaitQueueTimeout(final StartTime checkoutStart) { + final long ms = getTimeoutSettings().getMaxWaitTimeMS(); + return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); + } + + public int getConnectTimeoutMs() { + return (int) getTimeoutSettings().getConnectTimeoutMS(); + } } diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java b/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java index 403f03c0b3f..d90a73fb739 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java @@ -37,35 +37,38 @@ public class TimeoutSettings { @Nullable private final Long defaultTimeoutMS; - private final long maxAwaitTimeMS; - - // Deprecated timeouts - private final long readTimeoutMS; + // Deprecated configuration timeout options + private final long readTimeoutMS; // aka socketTimeoutMS + private final long maxWaitTimeMS; // aka waitQueueTimeoutMS + @Nullable + private final Long wTimeoutMS; + // Deprecated options for CRUD methods private final long maxTimeMS; - + private final long maxAwaitTimeMS; private final long maxCommitTimeMS; - @Nullable - private final Long wTimeoutMS; public static final TimeoutSettings DEFAULT = create(MongoClientSettings.builder().build()); public static TimeoutSettings create(final MongoClientSettings settings) { - return new TimeoutSettings(settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS), + return new TimeoutSettings( + settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS), settings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS), settings.getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS), - settings.getTimeout(TimeUnit.MILLISECONDS)); + settings.getTimeout(TimeUnit.MILLISECONDS), + settings.getConnectionPoolSettings().getMaxWaitTime(TimeUnit.MILLISECONDS)); } public TimeoutSettings( - final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS, @Nullable final Long timeoutMS) { - this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null); + final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS, + @Nullable final Long timeoutMS, final long maxWaitTimeMS) { + this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null, maxWaitTimeMS); } TimeoutSettings( @Nullable final Long timeoutMS, @Nullable final Long defaultTimeoutMS, final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS, final long maxAwaitTimeMS, final long maxTimeMS, - final long maxCommitTimeMS, @Nullable final Long wTimeoutMS) { + final long maxCommitTimeMS, @Nullable final Long wTimeoutMS, final long maxWaitTimeMS) { isTrueArgument("timeoutMS must be >= 0", timeoutMS == null || timeoutMS >= 0); this.serverSelectionTimeoutMS = serverSelectionTimeoutMS; this.connectTimeoutMS = connectTimeoutMS; @@ -76,45 +79,46 @@ public TimeoutSettings( this.maxTimeMS = maxTimeMS; this.maxCommitTimeMS = maxCommitTimeMS; this.wTimeoutMS = wTimeoutMS; + this.maxWaitTimeMS = maxWaitTimeMS; } public TimeoutSettings connectionOnly() { - return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null); + return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null, 0); } public TimeoutSettings withTimeoutMS(final long timeoutMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withDefaultTimeoutMS(final long defaultTimeoutMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withMaxTimeMS(final long maxTimeMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withMaxAwaitTimeMS(final long maxAwaitTimeMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withMaxTimeAndMaxAwaitTimeMS(final long maxTimeMS, final long maxAwaitTimeMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withMaxCommitMS(final long maxCommitTimeMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public TimeoutSettings withWTimeoutMS(@Nullable final Long wTimeoutMS) { return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, - maxTimeMS, maxCommitTimeMS, wTimeoutMS); + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } public long getServerSelectionTimeoutMS() { @@ -156,6 +160,10 @@ public Long getWTimeoutMS() { return wTimeoutMS; } + public long getMaxWaitTimeMS() { + return maxWaitTimeMS; + } + @Override public String toString() { return "TimeoutSettings{" diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java index bb0d5953bfb..26b64c6f233 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java @@ -125,9 +125,9 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As } @Override - public void open() throws IOException { + public void open(final OperationContext operationContext) throws IOException { FutureAsyncCompletionHandler handler = new FutureAsyncCompletionHandler<>(); - openAsync(handler); + openAsync(operationContext, handler); handler.getOpen(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index 6a956247ed3..7cd4ed81a6f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -56,7 +56,7 @@ public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final @SuppressWarnings("deprecation") @Override - public void openAsync(final AsyncCompletionHandler handler) { + public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler handler) { isTrue("unopened", getChannel() == null); Queue socketAddressQueue; diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index ebf8b86ce25..4a1164fb006 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -105,26 +105,26 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); boolean selectionFailureLogged = false; - Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); + Timeout serverSelectionTimeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); while (true) { CountDownLatch currentPhaseLatch = phase.get(); ClusterDescription currentDescription = description; - ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, timeout); + ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, serverSelectionTimeout); throwIfIncompatible(currentDescription); if (serverTuple != null) { return serverTuple; } - if (timeout.hasExpired()) { + if (serverSelectionTimeout.hasExpired()) { throw createTimeoutException(serverSelector, currentDescription); } if (!selectionFailureLogged) { - logServerSelectionFailure(serverSelector, currentDescription, timeout); + logServerSelectionFailure(serverSelector, currentDescription, serverSelectionTimeout); selectionFailureLogged = true; } connect(); - Timeout heartbeatLimitedTimeout = timeout.orEarlier(startMinWaitHeartbeatTimeout()); + Timeout heartbeatLimitedTimeout = serverSelectionTimeout.orEarlier(startMinWaitHeartbeatTimeout()); heartbeatLimitedTimeout.awaitOn(currentPhaseLatch, () -> format("waiting for a server that matches %s", serverSelector)); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java index b58e7fdd68c..2129d42b941 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java @@ -18,34 +18,22 @@ import com.mongodb.MongoConnectionPoolClearedException; import com.mongodb.annotations.ThreadSafe; -import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.time.StartTime; -import org.bson.types.ObjectId; import com.mongodb.lang.Nullable; +import org.bson.types.ObjectId; import java.io.Closeable; -import java.util.concurrent.TimeUnit; /** * An instance of an implementation must be created in the {@linkplain #invalidate(Throwable) paused} state. */ @ThreadSafe interface ConnectionPool extends Closeable { - /** - * Is equivalent to {@link #get(OperationContext, long, TimeUnit)} called with {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}. - */ - InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException; - /** * @param operationContext the operation context - * @param timeout This is not a timeout for the whole {@link #get(OperationContext, long, TimeUnit)}, - * see {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}. - *

- * See {@link StartTime#timeoutAfterOrInfiniteIfNegative(long, TimeUnit)}.

* @throws MongoConnectionPoolClearedException If detects that the pool is {@linkplain #invalidate(Throwable) paused}. */ - InternalConnection get(OperationContext operationContext, long timeout, TimeUnit timeUnit) throws MongoConnectionPoolClearedException; + InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException; /** * Completes the {@code callback} with a {@link MongoConnectionPoolClearedException} diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index f3d3afcd1ca..d91efb63139 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -186,18 +186,13 @@ public int getGeneration(@NonNull final ObjectId serviceId) { @Override public InternalConnection get(final OperationContext operationContext) { - return get(operationContext, settings.getMaxWaitTime(MILLISECONDS), MILLISECONDS); - } - - @Override - public InternalConnection get(final OperationContext operationContext, final long timeoutValue, final TimeUnit timeUnit) { StartTime checkoutStart = connectionCheckoutStarted(operationContext); - Timeout timeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(timeoutValue, timeUnit); + Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart); try { stateAndGeneration.throwIfClosedOrPaused(); - PooledConnection connection = getPooledConnection(timeout, checkoutStart); + PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart); if (!connection.opened()) { - connection = openConcurrencyLimiter.openOrGetAvailable(connection, timeout, checkoutStart); + connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart); } connection.checkedOutForOperation(operationContext); connectionCheckedOut(operationContext, connection, checkoutStart); @@ -210,7 +205,7 @@ public InternalConnection get(final OperationContext operationContext, final lon @Override public void getAsync(final OperationContext operationContext, final SingleResultCallback callback) { StartTime checkoutStart = connectionCheckoutStarted(operationContext); - Timeout timeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS); + Timeout maxWaitTimeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS); SingleResultCallback eventSendingCallback = (connection, failure) -> { SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (failure == null) { @@ -227,13 +222,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult eventSendingCallback.onResult(null, e); return; } - asyncWorkManager.enqueue(new Task(timeout, checkoutStart, t -> { + asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, t -> { if (t != null) { eventSendingCallback.onResult(null, t); } else { PooledConnection connection; try { - connection = getPooledConnection(timeout, checkoutStart); + connection = getPooledConnection(maxWaitTimeout, checkoutStart); } catch (Exception e) { eventSendingCallback.onResult(null, e); return; @@ -241,8 +236,8 @@ public void getAsync(final OperationContext operationContext, final SingleResult if (connection.opened()) { eventSendingCallback.onResult(connection, null); } else { - openConcurrencyLimiter.openAsyncWithConcurrencyLimit( - connection, timeout, checkoutStart, eventSendingCallback); + openConcurrencyLimiter.openWithConcurrencyLimitAsync( + operationContext, connection, maxWaitTimeout, checkoutStart, eventSendingCallback); } } })); @@ -332,12 +327,12 @@ public int getGeneration() { return stateAndGeneration.generation(); } - private PooledConnection getPooledConnection(final Timeout timeout, final StartTime startTime) throws MongoTimeoutException { + private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException { try { - UsageTrackingInternalConnection internalConnection = pool.get(timeout.remainingOrNegativeForInfinite(NANOSECONDS), NANOSECONDS); + UsageTrackingInternalConnection internalConnection = pool.get(waitQueueTimeout.remainingOrNegativeForInfinite(NANOSECONDS), NANOSECONDS); while (shouldPrune(internalConnection)) { pool.release(internalConnection, true); - internalConnection = pool.get(timeout.remainingOrNegativeForInfinite(NANOSECONDS), NANOSECONDS); + internalConnection = pool.get(waitQueueTimeout.remainingOrNegativeForInfinite(NANOSECONDS), NANOSECONDS); } return new PooledConnection(internalConnection); } catch (MongoTimeoutException e) { @@ -417,7 +412,8 @@ void doMaintenance() { if (shouldEnsureMinSize()) { pool.ensureMinSize(settings.getMinSize(), newConnection -> { try { - openConcurrencyLimiter.openImmediatelyAndTryHandOverOrRelease(new PooledConnection(newConnection)); + OperationContext operationContext = OperationContext.nonUserOperationContext(null); + openConcurrencyLimiter.openImmediatelyAndTryHandOverOrRelease(operationContext, new PooledConnection(newConnection)); } catch (MongoException | MongoOpenConnectionInternalException e) { RuntimeException actualException = e instanceof MongoOpenConnectionInternalException ? (RuntimeException) e.getCause() @@ -638,12 +634,12 @@ public void checkedOutForOperation(final OperationContext operationContext) { } @Override - public void open() { + public void open(final OperationContext operationContext) { assertFalse(isClosed.get()); StartTime openStart; try { openStart = connectionCreated(connectionPoolListener, wrapped.getDescription().getConnectionId()); - wrapped.open(); + wrapped.open(operationContext); } catch (Exception e) { closeAndHandleOpenFailure(); throw new MongoOpenConnectionInternalException(e); @@ -652,10 +648,10 @@ public void open() { } @Override - public void openAsync(final SingleResultCallback callback) { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { assertFalse(isClosed.get()); StartTime openStart = connectionCreated(connectionPoolListener, wrapped.getDescription().getConnectionId()); - wrapped.openAsync((nullResult, failure) -> { + wrapped.openAsync(operationContext, (nullResult, failure) -> { if (failure != null) { closeAndHandleOpenFailure(); callback.onResult(null, new MongoOpenConnectionInternalException(failure)); @@ -927,25 +923,29 @@ private final class OpenConcurrencyLimiter { desiredConnectionSlots = new LinkedList<>(); } - PooledConnection openOrGetAvailable(final PooledConnection connection, final Timeout timeout, - final StartTime startTime) + PooledConnection openOrGetAvailable(final OperationContext operationContext, final PooledConnection connection, + final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException { PooledConnection result = openWithConcurrencyLimit( - connection, OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, timeout, startTime); + operationContext, connection, OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, + waitQueueTimeout, startTime); return assertNotNull(result); } - void openImmediatelyAndTryHandOverOrRelease(final PooledConnection connection) throws MongoTimeoutException { + void openImmediatelyAndTryHandOverOrRelease(final OperationContext operationContext, + final PooledConnection connection) throws MongoTimeoutException { StartTime startTime = StartTime.now(); Timeout timeout = startTime.asTimeout(); assertNull(openWithConcurrencyLimit( - connection, OpenWithConcurrencyLimitMode.TRY_HAND_OVER_OR_RELEASE, timeout, startTime)); + operationContext, + connection, OpenWithConcurrencyLimitMode.TRY_HAND_OVER_OR_RELEASE, + timeout, startTime)); } /** - * This method can be thought of as operating in two phases. - * In the first phase it tries to synchronously acquire a permit to open the {@code connection} - * or get a different {@linkplain PooledConnection#opened() opened} connection if {@code mode} is + * This method can be thought of as operating in two phases. In the first phase it tries to synchronously + * acquire a permit to open the {@code connection} or get a different + * {@linkplain PooledConnection#opened() opened} connection if {@code mode} is * {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE} and one becomes available while waiting for a permit. * The first phase has one of the following outcomes: *
    @@ -956,7 +956,7 @@ void openImmediatelyAndTryHandOverOrRelease(final PooledConnection connection) t * This outcome is possible only if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}. *
  1. A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported * and an attempt to open the specified {@code connection} is made. This is the second phase in which - * the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}. + * the {@code connection} is {@linkplain InternalConnection#open(OperationContext) opened synchronously}. * The attempt to open the {@code connection} has one of the following outcomes * combined with releasing the acquired permit: *
      @@ -970,21 +970,23 @@ void openImmediatelyAndTryHandOverOrRelease(final PooledConnection connection) t * *
    * - * @param timeout Applies only to the first phase. - * @return An {@linkplain PooledConnection#opened() opened} connection which is - * either the specified {@code connection}, - * or potentially a different one if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}, - * or {@code null} if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE}. + * @param operationContext + * @param waitQueueTimeout Applies only to the first phase. + * @return An {@linkplain PooledConnection#opened() opened} connection which is either the specified + * {@code connection}, or potentially a different one if {@code mode} is + * {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}, or {@code null} if {@code mode} is + * {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE}. * @throws MongoTimeoutException If the first phase timed out. */ @Nullable - private PooledConnection openWithConcurrencyLimit(final PooledConnection connection, - final OpenWithConcurrencyLimitMode mode, final Timeout timeout, final StartTime startTime) + private PooledConnection openWithConcurrencyLimit(final OperationContext operationContext, + final PooledConnection connection, final OpenWithConcurrencyLimitMode mode, + final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException { PooledConnection availableConnection; try {//phase one availableConnection = acquirePermitOrGetAvailableOpenedConnection( - mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, timeout, startTime); + mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime); } catch (Exception e) { connection.closeSilently(); throw e; @@ -994,7 +996,7 @@ private PooledConnection openWithConcurrencyLimit(final PooledConnection connect return availableConnection; } else {//acquired a permit, phase two try { - connection.open(); + connection.open(operationContext); if (mode == OpenWithConcurrencyLimitMode.TRY_HAND_OVER_OR_RELEASE) { tryHandOverOrRelease(connection.wrapped); return null; @@ -1008,24 +1010,25 @@ private PooledConnection openWithConcurrencyLimit(final PooledConnection connect } /** - * This method is similar to {@link #openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout, StartTime)} + * This method is similar to {@link #openWithConcurrencyLimit(OperationContext, PooledConnection, OpenWithConcurrencyLimitMode, Timeout, StartTime)} * with the following differences: *
      *
    • It does not have the {@code mode} parameter and acts as if this parameter were * {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}.
    • *
    • While the first phase is still synchronous, the {@code connection} is - * {@linkplain PooledConnection#openAsync(SingleResultCallback) opened asynchronously} in the second phase.
    • + * {@linkplain InternalConnection#openAsync(OperationContext, SingleResultCallback) opened asynchronously} in the second phase. *
    • Instead of returning a result or throwing an exception via Java {@code return}/{@code throw} statements, * it calls {@code callback.}{@link SingleResultCallback#onResult(Object, Throwable) onResult(result, failure)} * and passes either a {@link PooledConnection} or an {@link Exception}.
    • *
    */ - void openAsyncWithConcurrencyLimit( - final PooledConnection connection, final Timeout timeout, final StartTime startTime, + void openWithConcurrencyLimitAsync( + final OperationContext operationContext, final PooledConnection connection, + final Timeout maxWaitTimeout, final StartTime startTime, final SingleResultCallback callback) { PooledConnection availableConnection; try {//phase one - availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, timeout, startTime); + availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime); } catch (Exception e) { connection.closeSilently(); callback.onResult(null, e); @@ -1035,7 +1038,7 @@ void openAsyncWithConcurrencyLimit( connection.closeSilently(); callback.onResult(availableConnection, null); } else {//acquired a permit, phase two - connection.openAsync((nullResult, failure) -> { + connection.openAsync(operationContext, (nullResult, failure) -> { releasePermit(); if (failure != null) { callback.onResult(null, failure); @@ -1056,7 +1059,7 @@ void openAsyncWithConcurrencyLimit( */ @Nullable private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, - final Timeout timeout, final StartTime startTime) + final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException, MongoInterruptedException { PooledConnection availableConnection = null; boolean expressedDesireToGetAvailableConnection = false; @@ -1086,10 +1089,10 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole // the absence of short-circuiting is of importance & !stateAndGeneration.throwIfClosedOrPaused() & (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) { - if (timeout.hasExpired()) { + if (waitQueueTimeout.hasExpired()) { throw createTimeoutException(startTime); } - timeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition, + waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition, () -> "acquiring permit or getting available opened connection"); } if (availableConnection == null) { @@ -1166,7 +1169,7 @@ void signalClosedOrPaused() { } /** - * @see OpenConcurrencyLimiter#openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout, StartTime) + * @see OpenConcurrencyLimiter#openWithConcurrencyLimit(OperationContext, PooledConnection, OpenWithConcurrencyLimitMode, Timeout, StartTime) */ private enum OpenWithConcurrencyLimitMode { TRY_GET_AVAILABLE, diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 468aea56f98..8843af50f23 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -194,7 +194,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren if (connection == null || connection.isClosed()) { currentCheckCancelled = false; InternalConnection newConnection = internalConnectionFactory.create(serverId); - newConnection.open(); + newConnection.open(OperationContext.nonUserOperationContext(null)); connection = newConnection; averageRoundTripTime.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); return connection.getInitialServerDescription(); @@ -426,7 +426,7 @@ public void run() { private void initialize() { connection = null; connection = internalConnectionFactory.create(serverId); - connection.open(); + connection.open(OperationContext.nonUserOperationContext(null)); averageRoundTripTime.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java index 07dda581f7b..af1b7cb8e5f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java @@ -51,14 +51,15 @@ public interface InternalConnection extends BufferProvider { /** * Opens the connection so its ready for use */ - void open(); + void open(OperationContext operationContext); /** * Opens the connection so its ready for use * - * @param callback the callback to be called once the connection has been opened + * @param operationContext + * @param callback the callback to be called once the connection has been opened */ - void openAsync(SingleResultCallback callback); + void openAsync(OperationContext operationContext, SingleResultCallback callback); /** * Closes the connection. diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 2e1ddfe4420..7c9ac4cff05 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -203,11 +203,11 @@ public int getGeneration() { } @Override - public void open() { + public void open(final OperationContext operationContext) { isTrue("Open already called", stream == null); stream = streamFactory.create(getServerAddressWithResolver()); try { - stream.open(); + stream.open(operationContext); InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this); initAfterHandshakeStart(initializationDescription); @@ -225,11 +225,11 @@ public void open() { } @Override - public void openAsync(final SingleResultCallback callback) { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { isTrue("Open already called", stream == null, callback); try { stream = streamFactory.create(getServerAddressWithResolver()); - stream.openAsync(new AsyncCompletionHandler() { + stream.openAsync(operationContext, new AsyncCompletionHandler() { @Override public void completed(@Nullable final Void aVoid) { connectionInitializer.startHandshakeAsync(InternalStreamConnection.this, diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index e2d8103ae87..9cb73727e6b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -15,9 +15,12 @@ */ package com.mongodb.internal.connection; +import com.mongodb.MongoClientSettings; import com.mongodb.RequestContext; import com.mongodb.ServerApi; +import com.mongodb.internal.IgnorableRequestContext; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; @@ -41,6 +44,24 @@ public OperationContext(final RequestContext requestContext, final SessionContex this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, serverApi); } + + public static OperationContext todoOperationContext() { + // TODO (CSOT) should be removed; used at locations that require an OC, but which do not yet have one available + return nonUserOperationContext(null); + } + + public static OperationContext nonUserOperationContext(final MongoClientSettings settings) { + // TODO (CSOT) below is placeholder, validate correctness (serverApi/timeoutSettings might + // TODO (CSOT) need to be passed in instead) + TimeoutSettings timeoutSettings = TimeoutSettings.create(settings); + ServerApi serverApi = settings.getServerApi(); + return new OperationContext( + IgnorableRequestContext.INSTANCE, + NoOpSessionContext.INSTANCE, + new TimeoutContext(timeoutSettings.connectionOnly()), + serverApi); + } + public OperationContext withSessionContext(final SessionContext sessionContext) { return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverApi); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java index 03580cc7c89..303a070b75b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java @@ -39,14 +39,12 @@ import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.connection.SocketStreamHelper.configureSocket; import static com.mongodb.internal.connection.SslHelper.configureSslSocket; import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException; -import static java.util.concurrent.TimeUnit.MILLISECONDS; /** *

    This class is not part of the public API and may be removed or changed at any time

    @@ -73,9 +71,9 @@ public SocketStream(final ServerAddress address, final SocketSettings settings, } @Override - public void open() { + public void open(final OperationContext operationContext) { try { - socket = initializeSocket(); + socket = initializeSocket(operationContext); outputStream = socket.getOutputStream(); inputStream = socket.getInputStream(); } catch (IOException e) { @@ -86,22 +84,22 @@ public void open() { } @SuppressWarnings("deprecation") - protected Socket initializeSocket() throws IOException { + protected Socket initializeSocket(final OperationContext operationContext) throws IOException { ProxySettings proxySettings = settings.getProxySettings(); if (proxySettings.isProxyEnabled()) { if (sslSettings.isEnabled()) { assertTrue(socketFactory instanceof SSLSocketFactory); SSLSocketFactory sslSocketFactory = (SSLSocketFactory) socketFactory; - return initializeSslSocketOverSocksProxy(sslSocketFactory); + return initializeSslSocketOverSocksProxy(operationContext, sslSocketFactory); } - return initializeSocketOverSocksProxy(); + return initializeSocketOverSocksProxy(operationContext); } Iterator inetSocketAddresses = address.getSocketAddresses().iterator(); while (inetSocketAddresses.hasNext()) { Socket socket = socketFactory.createSocket(); try { - SocketStreamHelper.initialize(socket, inetSocketAddresses.next(), settings, sslSettings); + SocketStreamHelper.initialize(operationContext, socket, inetSocketAddresses.next(), settings, sslSettings); return socket; } catch (SocketTimeoutException e) { if (!inetSocketAddresses.hasNext()) { @@ -113,14 +111,15 @@ protected Socket initializeSocket() throws IOException { throw new MongoSocketException("Exception opening socket", getAddress()); } - private SSLSocket initializeSslSocketOverSocksProxy(final SSLSocketFactory sslSocketFactory) throws IOException { + private SSLSocket initializeSslSocketOverSocksProxy(final OperationContext operationContext, + final SSLSocketFactory sslSocketFactory) throws IOException { final String serverHost = address.getHost(); final int serverPort = address.getPort(); SocksSocket socksProxy = new SocksSocket(settings.getProxySettings()); configureSocket(socksProxy, settings); InetSocketAddress inetSocketAddress = toSocketAddress(serverHost, serverPort); - socksProxy.connect(inetSocketAddress, settings.getConnectTimeout(MILLISECONDS)); + socksProxy.connect(inetSocketAddress, operationContext.getTimeoutContext().getConnectTimeoutMs()); SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(socksProxy, serverHost, serverPort, true); //Even though Socks proxy connection is already established, TLS handshake has not been performed yet. @@ -138,7 +137,7 @@ private static InetSocketAddress toSocketAddress(final String serverHost, final return InetSocketAddress.createUnresolved(serverHost, serverPort); } - private Socket initializeSocketOverSocksProxy() throws IOException { + private Socket initializeSocketOverSocksProxy(final OperationContext operationContext) throws IOException { Socket createdSocket = socketFactory.createSocket(); configureSocket(createdSocket, settings); /* @@ -149,7 +148,7 @@ private Socket initializeSocketOverSocksProxy() throws IOException { SocksSocket socksProxy = new SocksSocket(createdSocket, settings.getProxySettings()); socksProxy.connect(toSocketAddress(address.getHost(), address.getPort()), - settings.getConnectTimeout(TimeUnit.MILLISECONDS)); + operationContext.getTimeoutContext().getConnectTimeoutMs()); return socksProxy; } @@ -207,7 +206,7 @@ public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOEx } @Override - public void openAsync(final AsyncCompletionHandler handler) { + public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler handler) { throw new UnsupportedOperationException(getClass() + " does not support asynchronous operations."); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java b/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java index 1b5e789e646..73b3838659f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java @@ -69,11 +69,12 @@ final class SocketStreamHelper { SET_OPTION_METHOD = setOptionMethod; } - static void initialize(final Socket socket, final InetSocketAddress inetSocketAddress, final SocketSettings settings, - final SslSettings sslSettings) throws IOException { + static void initialize(final OperationContext operationContext, final Socket socket, + final InetSocketAddress inetSocketAddress, final SocketSettings settings, + final SslSettings sslSettings) throws IOException { configureSocket(socket, settings); configureSslSocket(socket, sslSettings, inetSocketAddress); - socket.connect(inetSocketAddress, settings.getConnectTimeout(MILLISECONDS)); + socket.connect(inetSocketAddress, operationContext.getTimeoutContext().getConnectTimeoutMs()); } static void configureSocket(final Socket socket, final SocketSettings settings) throws SocketException { diff --git a/driver-core/src/main/com/mongodb/internal/connection/UnixSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/UnixSocketChannelStream.java index f19c85740c7..cc31f676748 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/UnixSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/UnixSocketChannelStream.java @@ -42,7 +42,7 @@ public UnixSocketChannelStream(final UnixServerAddress address, final SocketSett @SuppressWarnings("deprecation") @Override - protected Socket initializeSocket() throws IOException { + protected Socket initializeSocket(final OperationContext operationContext) throws IOException { return UnixSocketChannel.open((UnixSocketAddress) address.getUnixSocketAddress()).socket(); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java b/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java index f794d3b9a27..8ede9d095b9 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java @@ -49,8 +49,8 @@ class UsageTrackingInternalConnection implements InternalConnection { } @Override - public void open() { - wrapped.open(); + public void open(final OperationContext operationContext) { + wrapped.open(operationContext); openedAt = System.currentTimeMillis(); lastUsedAt = openedAt; if (getDescription().getServiceId() != null) { @@ -59,8 +59,8 @@ public void open() { } @Override - public void openAsync(final SingleResultCallback callback) { - wrapped.openAsync((result, t) -> { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { + wrapped.openAsync(operationContext, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 87ceccd028e..0caeaebece0 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -123,7 +123,7 @@ public final class ClusterFixture { public static final long TIMEOUT = 60L; public static final Duration TIMEOUT_DURATION = Duration.ofSeconds(TIMEOUT); - public static final TimeoutSettings TIMEOUT_SETTINGS = new TimeoutSettings(30_000, 10_000, 0, null); + public static final TimeoutSettings TIMEOUT_SETTINGS = new TimeoutSettings(30_000, 10_000, 0, null, SECONDS.toMillis(5)); public static final TimeoutSettings TIMEOUT_SETTINGS_WITH_TIMEOUT = TIMEOUT_SETTINGS.withTimeoutMS(TIMEOUT_DURATION.toMillis()); public static final TimeoutSettings TIMEOUT_SETTINGS_WITH_MAX_TIME = TIMEOUT_SETTINGS.withMaxTimeMS(100); public static final TimeoutSettings TIMEOUT_SETTINGS_WITH_MAX_AWAIT_TIME = TIMEOUT_SETTINGS.withMaxAwaitTimeMS(101); diff --git a/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy b/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy index 6628dfb5625..e5d6181e8c6 100644 --- a/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy @@ -13,6 +13,7 @@ import spock.lang.Specification import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getSslSettings class NettyStreamSpecification extends Specification { @@ -36,7 +37,7 @@ class NettyStreamSpecification extends Specification { def stream = factory.create(serverAddress) when: - stream.open() + stream.open(OPERATION_CONTEXT) then: !stream.isClosed() @@ -61,7 +62,7 @@ class NettyStreamSpecification extends Specification { def stream = factory.create(serverAddress) when: - stream.open() + stream.open(OPERATION_CONTEXT) then: thrown(MongoSocketOpenException) @@ -79,7 +80,7 @@ class NettyStreamSpecification extends Specification { def callback = new CallbackErrorHolder() when: - stream.openAsync(callback) + stream.openAsync(OPERATION_CONTEXT, callback) then: callback.getError().is(exception) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy index cbe38d81cdf..fa32b703560 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy @@ -15,6 +15,7 @@ import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getSslSettings import static java.util.concurrent.TimeUnit.MILLISECONDS @@ -39,7 +40,7 @@ class AsyncSocketChannelStreamSpecification extends Specification { def stream = factory.create(serverAddress) when: - stream.open() + stream.open(OPERATION_CONTEXT) then: !stream.isClosed() @@ -67,7 +68,7 @@ class AsyncSocketChannelStreamSpecification extends Specification { def stream = factory.create(serverAddress) when: - stream.open() + stream.open(OPERATION_CONTEXT) then: thrown(MongoSocketOpenException) @@ -86,7 +87,7 @@ class AsyncSocketChannelStreamSpecification extends Specification { def callback = new CallbackErrorHolder() when: - stream.openAsync(callback) + stream.openAsync(OPERATION_CONTEXT, callback) then: callback.getError().is(exception) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncStreamTimeoutsSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncStreamTimeoutsSpecification.groovy index bfab5039181..8711c54a246 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncStreamTimeoutsSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/AsyncStreamTimeoutsSpecification.groovy @@ -35,6 +35,7 @@ import util.spock.annotations.Slow import java.util.concurrent.TimeUnit +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getClusterConnectionMode import static com.mongodb.ClusterFixture.getCredentialWithCache import static com.mongodb.ClusterFixture.getPrimary @@ -57,7 +58,7 @@ class AsyncStreamTimeoutsSpecification extends OperationFunctionalSpecification .create(new ServerId(new ClusterId(), new ServerAddress(new InetSocketAddress('192.168.255.255', 27017)))) when: - connection.open() + connection.open(OPERATION_CONTEXT) then: thrown(MongoSocketOpenException) @@ -69,7 +70,7 @@ class AsyncStreamTimeoutsSpecification extends OperationFunctionalSpecification def connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE, new AsynchronousSocketChannelStreamFactory(readSocketSettings, getSslSettings()), getCredentialWithCache(), null, null, [], LoggerSettings.builder().build(), null, getServerApi(), null).create(new ServerId(new ClusterId(), getPrimary())) - connection.open() + connection.open(OPERATION_CONTEXT) getCollectionHelper().insertDocuments(new BsonDocument('_id', new BsonInt32(1))) def countCommand = new BsonDocument('count', new BsonString(getCollectionName())) @@ -93,7 +94,7 @@ class AsyncStreamTimeoutsSpecification extends OperationFunctionalSpecification new ServerAddress(new InetSocketAddress('192.168.255.255', 27017)))) when: - connection.open() + connection.open(OPERATION_CONTEXT) then: thrown(MongoSocketOpenException) @@ -105,7 +106,7 @@ class AsyncStreamTimeoutsSpecification extends OperationFunctionalSpecification def connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE, new NettyStreamFactory(readSocketSettings, getSslSettings()), getCredentialWithCache(), null, null, [], LoggerSettings.builder().build(), null, getServerApi(), null).create(new ServerId(new ClusterId(), getPrimary())) - connection.open() + connection.open(OPERATION_CONTEXT) getCollectionHelper().insertDocuments(new BsonDocument('_id', new BsonInt32(1))) def countCommand = new BsonDocument('count', new BsonString(getCollectionName())) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/AwsAuthenticationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/AwsAuthenticationSpecification.groovy index 3c8920f98e7..96af9862074 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/AwsAuthenticationSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/AwsAuthenticationSpecification.groovy @@ -21,6 +21,7 @@ import spock.lang.Specification import java.util.function.Supplier import static com.mongodb.AuthenticationMechanism.MONGODB_AWS +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getClusterConnectionMode import static com.mongodb.ClusterFixture.getConnectionString import static com.mongodb.ClusterFixture.getCredential @@ -160,10 +161,10 @@ class AwsAuthenticationSpecification extends Specification { private static void openConnection(final InternalConnection connection, final boolean async) { if (async) { FutureResultCallback futureResultCallback = new FutureResultCallback() - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get(ClusterFixture.TIMEOUT, SECONDS) } else { - connection.open() + connection.open(OPERATION_CONTEXT) } } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy index 2a1d9ca3059..bc20ab4b116 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy @@ -35,6 +35,7 @@ import spock.lang.Specification import java.util.concurrent.CountDownLatch import static com.mongodb.ClusterFixture.LEGACY_HELLO +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getClusterConnectionMode import static com.mongodb.ClusterFixture.getCredentialWithCache import static com.mongodb.ClusterFixture.getPrimary @@ -51,7 +52,7 @@ class CommandHelperSpecification extends Specification { new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()), getCredentialWithCache(), null, null, [], LoggerSettings.builder().build(), null, getServerApi(), null) .create(new ServerId(new ClusterId(), getPrimary())) - connection.open() + connection.open(OPERATION_CONTEXT) } def cleanup() { diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java index b109f9ce83e..cb5becffd37 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java @@ -26,6 +26,8 @@ import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.connection.ServerId; import com.mongodb.event.ConnectionCreatedEvent; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.inject.EmptyProvider; import com.mongodb.internal.inject.OptionalProvider; @@ -399,7 +401,12 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce provider.ready(); List connections = new ArrayList<>(); for (int i = 0; i < openConnectionsCount; i++) { - connections.add(provider.get(OPERATION_CONTEXT, 0, NANOSECONDS)); + OperationContext operationContextWithZeroMaxWait = new OperationContext( + OPERATION_CONTEXT.getRequestContext(), + OPERATION_CONTEXT.getSessionContext(), + new TimeoutContext(new TimeoutSettings(30_000, 10_000, 0, null, 0)), + OPERATION_CONTEXT.getServerApi()); + connections.add(provider.get(operationContextWithZeroMaxWait)); } acquireOpenPermits(provider, DEFAULT_MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_OPEN, controllableConnFactory, listener); int previousIdx = 0; @@ -417,7 +424,7 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce return connectionId; })); Runnable checkOut = () -> receivedFutures.add(cachedExecutor.submit(() -> { - InternalConnection connection = provider.get(OPERATION_CONTEXT, TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS); + InternalConnection connection = provider.get(OPERATION_CONTEXT); return connection.getDescription().getConnectionId(); })); if (ThreadLocalRandom.current().nextBoolean()) { @@ -531,7 +538,7 @@ private static void useConcurrently(final DefaultConnectionPool pool, final int spontaneouslyInvalidateReady.run(); InternalConnection conn = null; try { - conn = pool.get(OPERATION_CONTEXT, TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS); + conn = pool.get(OPERATION_CONTEXT); } catch (MongoConnectionPoolClearedException e) { // expected because we spontaneously invalidate `pool` } finally { @@ -638,7 +645,7 @@ private static ControllableConnectionFactory newControllableConnectionFactory(fi doAnswer(invocation -> { doOpen.run(); return null; - }).when(connection).open(); + }).when(connection).open(OPERATION_CONTEXT); doAnswer(invocation -> { SingleResultCallback callback = invocation.getArgument(0, SingleResultCallback.class); asyncOpenExecutor.execute(() -> { @@ -646,7 +653,7 @@ private static ControllableConnectionFactory newControllableConnectionFactory(fi callback.onResult(null, null); }); return null; - }).when(connection).openAsync(any()); + }).when(connection).openAsync(OPERATION_CONTEXT, any()); return connection; }; return new ControllableConnectionFactory(connectionFactory, openDurationHandle); diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticationSpecification.groovy index 5206c4983d3..d266f9b1121 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticationSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticationSpecification.groovy @@ -37,6 +37,7 @@ import javax.security.auth.Subject import javax.security.auth.login.LoginContext import static com.mongodb.AuthenticationMechanism.GSSAPI +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getClusterConnectionMode import static com.mongodb.ClusterFixture.getConnectionString import static com.mongodb.ClusterFixture.getCredential @@ -219,10 +220,10 @@ class GSSAPIAuthenticationSpecification extends Specification { private static void openConnection(final InternalConnection connection, final boolean async) { if (async) { FutureResultCallback futureResultCallback = new FutureResultCallback() - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get(ClusterFixture.TIMEOUT, SECONDS) } else { - connection.open() + connection.open(OPERATION_CONTEXT) } } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticatorSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticatorSpecification.groovy index 0339a0da38c..8154c220773 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticatorSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/GSSAPIAuthenticatorSpecification.groovy @@ -30,6 +30,7 @@ import spock.lang.Specification import javax.security.auth.login.LoginContext import static com.mongodb.AuthenticationMechanism.GSSAPI +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getLoginContextName import static com.mongodb.ClusterFixture.getPrimary import static com.mongodb.ClusterFixture.getServerApi @@ -54,7 +55,7 @@ class GSSAPIAuthenticatorSpecification extends Specification { .create(new ServerId(new ClusterId(), getPrimary())) when: - internalConnection.open() + internalConnection.open(OPERATION_CONTEXT) then: 1 * subjectProvider.getSubject() >> subject diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticationSpecification.groovy index 02dc14ed79b..7d76f19ff12 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticationSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticationSpecification.groovy @@ -33,6 +33,7 @@ import spock.lang.IgnoreIf import spock.lang.Specification import static com.mongodb.AuthenticationMechanism.PLAIN +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getClusterConnectionMode import static com.mongodb.ClusterFixture.getConnectionString import static com.mongodb.ClusterFixture.getCredential @@ -123,10 +124,10 @@ class PlainAuthenticationSpecification extends Specification { private static void openConnection(final InternalConnection connection, final boolean async) { if (async) { FutureResultCallback futureResultCallback = new FutureResultCallback() - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get(ClusterFixture.TIMEOUT, SECONDS) } else { - connection.open() + connection.open(OPERATION_CONTEXT) } } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/SocketStreamHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/SocketStreamHelperSpecification.groovy index 5a2492da109..66034235bd8 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/SocketStreamHelperSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/SocketStreamHelperSpecification.groovy @@ -30,6 +30,7 @@ import javax.net.ssl.SSLSocket import javax.net.ssl.SSLSocketFactory import java.lang.reflect.Method +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getPrimary import static java.util.concurrent.TimeUnit.MILLISECONDS import static java.util.concurrent.TimeUnit.SECONDS @@ -44,7 +45,8 @@ class SocketStreamHelperSpecification extends Specification { .build() when: - SocketStreamHelper.initialize(socket, getPrimary().getSocketAddress(), socketSettings, SslSettings.builder().build()) + SocketStreamHelper.initialize(OPERATION_CONTEXT, socket, getPrimary().getSocketAddress(), + socketSettings, SslSettings.builder().build()) then: socket.getTcpNoDelay() @@ -68,7 +70,7 @@ class SocketStreamHelperSpecification extends Specification { Socket socket = SocketFactory.default.createSocket() when: - SocketStreamHelper.initialize(socket, getPrimary().getSocketAddress(), + SocketStreamHelper.initialize(OPERATION_CONTEXT, socket, getPrimary().getSocketAddress(), SocketSettings.builder().build(), SslSettings.builder().build()) then: @@ -84,7 +86,8 @@ class SocketStreamHelperSpecification extends Specification { SSLSocket socket = SSLSocketFactory.default.createSocket() when: - SocketStreamHelper.initialize(socket, getPrimary().getSocketAddress(), SocketSettings.builder().build(), sslSettings) + SocketStreamHelper.initialize(OPERATION_CONTEXT, socket, getPrimary().getSocketAddress(), + SocketSettings.builder().build(), sslSettings) then: socket.getSSLParameters().endpointIdentificationAlgorithm == (sslSettings.invalidHostNameAllowed ? null : 'HTTPS') @@ -104,7 +107,8 @@ class SocketStreamHelperSpecification extends Specification { SSLSocket socket = SSLSocketFactory.default.createSocket() when: - SocketStreamHelper.initialize(socket, getPrimary().getSocketAddress(), SocketSettings.builder().build(), sslSettings) + SocketStreamHelper.initialize(OPERATION_CONTEXT, socket, getPrimary().getSocketAddress(), + SocketSettings.builder().build(), sslSettings) then: socket.getSSLParameters().getServerNames() == [new SNIHostName(getPrimary().getHost())] @@ -122,7 +126,7 @@ class SocketStreamHelperSpecification extends Specification { Socket socket = SocketFactory.default.createSocket() when: - SocketStreamHelper.initialize(socket, getPrimary().getSocketAddress(), SocketSettings.builder().build(), + SocketStreamHelper.initialize(OPERATION_CONTEXT, socket, getPrimary().getSocketAddress(), SocketSettings.builder().build(), SslSettings.builder().enabled(true).build()) then: diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/StreamSocketAddressSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/StreamSocketAddressSpecification.groovy index 260457fade0..7c94d1c4f97 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/StreamSocketAddressSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/StreamSocketAddressSpecification.groovy @@ -13,6 +13,7 @@ import util.spock.annotations.Slow import javax.net.SocketFactory import java.util.concurrent.TimeUnit +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.ClusterFixture.getSslSettings class StreamSocketAddressSpecification extends Specification { @@ -43,7 +44,7 @@ class StreamSocketAddressSpecification extends Specification { def socketStream = new SocketStream(serverAddress, socketSettings, sslSettings, socketFactory, bufferProvider) when: - socketStream.open() + socketStream.open(OPERATION_CONTEXT) then: !socket0.isConnected() @@ -80,7 +81,7 @@ class StreamSocketAddressSpecification extends Specification { def socketStream = new SocketStream(serverAddress, socketSettings, sslSettings, socketFactory, bufferProvider) when: - socketStream.open() + socketStream.open(OPERATION_CONTEXT) then: thrown(MongoSocketOpenException) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java index 3f1e866ac1c..995309eef15 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java @@ -572,13 +572,6 @@ public InternalConnection get(final OperationContext operationContext) { return result; } - @Override - public InternalConnection get(final OperationContext operationContext, final long timeout, final TimeUnit timeUnit) { - InternalConnection result = pool.get(operationContext, timeout, timeUnit); - updateConnectionIdLocalValueAdjustment(result); - return result; - } - @Override public void getAsync(final OperationContext operationContext, final SingleResultCallback callback) { pool.getAsync(operationContext, (result, problem) -> { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultConnectionPoolSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultConnectionPoolSpecification.groovy index 6dd745f5a2a..1e0b3eea858 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultConnectionPoolSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultConnectionPoolSpecification.groovy @@ -383,7 +383,7 @@ class DefaultConnectionPoolSpecification extends Specification { def connection = Mock(InternalConnection) connection.getDescription() >> new ConnectionDescription(SERVER_ID) connection.opened() >> false - connection.open() >> { throw new UncheckedIOException('expected failure', new IOException()) } + connection.open(OPERATION_CONTEXT) >> { throw new UncheckedIOException('expected failure', new IOException()) } connectionFactory.create(SERVER_ID, _) >> connection pool = new DefaultConnectionPool(SERVER_ID, connectionFactory, builder().addConnectionPoolListener(listener).build(), mockSdamProvider()) @@ -507,7 +507,7 @@ class DefaultConnectionPoolSpecification extends Specification { def connection = Mock(InternalConnection) connection.getDescription() >> new ConnectionDescription(SERVER_ID) connection.opened() >> false - connection.open() >> { throw new UncheckedIOException('expected failure', new IOException()) } + connection.open(OPERATION_CONTEXT) >> { throw new UncheckedIOException('expected failure', new IOException()) } connectionFactory.create(SERVER_ID, _) >> connection pool = new DefaultConnectionPool(SERVER_ID, connectionFactory, builder().addConnectionPoolListener(listener).build(), mockSdamProvider()) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy index d1790a8acb7..c61bbaa0481 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy @@ -79,7 +79,7 @@ class DefaultServerMonitorSpecification extends Specification { def internalConnectionFactory = Mock(InternalConnectionFactory) { create(_) >> { Mock(InternalConnection) { - open() >> { sleep(100) } + open(_) >> { sleep(100) } } } } @@ -143,7 +143,7 @@ class DefaultServerMonitorSpecification extends Specification { def internalConnectionFactory = Mock(InternalConnectionFactory) { create(_) >> { Mock(InternalConnection) { - open() >> { } + open(_) >> { } getBuffer(_) >> { int size -> new ByteBufNIO(ByteBuffer.allocate(size)) @@ -224,7 +224,7 @@ class DefaultServerMonitorSpecification extends Specification { def internalConnectionFactory = Mock(InternalConnectionFactory) { create(_) >> { Mock(InternalConnection) { - open() >> { } + open(_) >> { } getBuffer(_) >> { int size -> new ByteBufNIO(ByteBuffer.allocate(size)) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy index a584bafed8c..ef067536b44 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy @@ -114,7 +114,7 @@ class InternalStreamConnectionSpecification extends Specification { def getOpenedConnection() { def connection = getConnection() - connection.open() + connection.open(OPERATION_CONTEXT) connection } @@ -132,7 +132,7 @@ class InternalStreamConnectionSpecification extends Specification { .lastUpdateTimeNanos(connection.getInitialServerDescription().getLastUpdateTime(NANOSECONDS)) .build() when: - connection.open() + connection.open(OPERATION_CONTEXT) then: connection.opened() @@ -159,7 +159,7 @@ class InternalStreamConnectionSpecification extends Specification { .build() when: - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get() then: @@ -177,7 +177,7 @@ class InternalStreamConnectionSpecification extends Specification { failedInitializer, null) when: - connection.open() + connection.open(OPERATION_CONTEXT) then: thrown MongoInternalException @@ -195,7 +195,7 @@ class InternalStreamConnectionSpecification extends Specification { when: def futureResultCallback = new FutureResultCallback() - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get() then: diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java index 22f0ec927b4..83c276dd362 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java @@ -26,7 +26,6 @@ import org.bson.types.ObjectId; import java.util.List; -import java.util.concurrent.TimeUnit; public class TestConnectionPool implements ConnectionPool { @@ -103,12 +102,12 @@ public ServerDescription getInitialServerDescription() { } @Override - public void open() { + public void open(final OperationContext operationContext) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - public void openAsync(final SingleResultCallback callback) { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { callback.onResult(null, new UnsupportedOperationException("Not implemented yet")); } @@ -134,14 +133,6 @@ public int getGeneration() { }; } - @Override - public InternalConnection get(final OperationContext operationContext, final long timeout, final TimeUnit timeUnit) { - if (exceptionToThrow != null) { - throw exceptionToThrow; - } - return get(operationContext); - } - @Override public void getAsync(final OperationContext operationContext, final SingleResultCallback callback) { if (exceptionToThrow != null) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java index 8166b5e2283..6a20f96dd17 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java @@ -106,12 +106,12 @@ public ServerDescription getInitialServerDescription() { throw new UnsupportedOperationException(); } - public void open() { + public void open(final OperationContext operationContext) { opened = true; } @Override - public void openAsync(final SingleResultCallback callback) { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { opened = true; callback.onResult(null, null); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java index 01ef6766bca..55928589b52 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java @@ -67,12 +67,12 @@ public int getGeneration() { return generation; } - public void open() { + public void open(final OperationContext operationContext) { opened = true; } @Override - public void openAsync(final SingleResultCallback callback) { + public void openAsync(final OperationContext operationContext, final SingleResultCallback callback) { opened = true; callback.onResult(null, null); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/UsageTrackingConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/UsageTrackingConnectionSpecification.groovy index 0a328ce2f00..1ded249c66b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/UsageTrackingConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/UsageTrackingConnectionSpecification.groovy @@ -50,7 +50,7 @@ class UsageTrackingConnectionSpecification extends Specification { connection.openedAt == Long.MAX_VALUE when: - connection.open() + connection.open(OPERATION_CONTEXT) then: connection.openedAt <= System.currentTimeMillis() @@ -66,7 +66,7 @@ class UsageTrackingConnectionSpecification extends Specification { connection.openedAt == Long.MAX_VALUE when: - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get() then: @@ -81,7 +81,7 @@ class UsageTrackingConnectionSpecification extends Specification { connection.lastUsedAt == Long.MAX_VALUE when: - connection.open() + connection.open(OPERATION_CONTEXT) then: connection.lastUsedAt <= System.currentTimeMillis() @@ -97,7 +97,7 @@ class UsageTrackingConnectionSpecification extends Specification { connection.lastUsedAt == Long.MAX_VALUE when: - connection.openAsync(futureResultCallback) + connection.openAsync(OPERATION_CONTEXT, futureResultCallback) futureResultCallback.get() then: @@ -107,7 +107,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on sendMessage'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt when: @@ -122,7 +122,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on sendMessage asynchronously'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt def futureResultCallback = new FutureResultCallback() @@ -138,7 +138,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on receiveMessage'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt when: connection.receiveMessage(1) @@ -151,7 +151,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on receiveMessage asynchronously'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt def futureResultCallback = new FutureResultCallback() @@ -167,7 +167,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on sendAndReceive'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt when: @@ -183,7 +183,7 @@ class UsageTrackingConnectionSpecification extends Specification { def 'lastUsedAt should be set on sendAndReceive asynchronously'() { given: def connection = createConnection() - connection.open() + connection.open(OPERATION_CONTEXT) def openedLastUsedAt = connection.lastUsedAt def futureResultCallback = new FutureResultCallback() diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java index b01b63d4a64..c72fc5ce6fe 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java @@ -26,6 +26,7 @@ import com.mongodb.connection.TlsChannelStreamFactoryFactory; import com.mongodb.crypt.capi.MongoKeyDecryptor; import com.mongodb.internal.connection.AsynchronousChannelStream; +import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.lang.Nullable; @@ -74,7 +75,8 @@ Mono decryptKey(final MongoKeyDecryptor keyDecryptor) { return Mono.create(sink -> { Stream stream = streamFactory.create(serverAddress); - stream.openAsync(new AsyncCompletionHandler() { + OperationContext operationContext = OperationContext.todoOperationContext(); + stream.openAsync(operationContext, new AsyncCompletionHandler() { @Override public void completed(@Nullable final Void ignored) { streamWrite(stream, keyDecryptor, sink);