Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions config/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,14 @@
<!-- Ignoring await return; intended to be used in a loop -->
<Match>
<Class name="com.mongodb.internal.time.Timeout"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these blocks be removed as there is no method attached?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring moved the issue into a lambda, and IIRC there is no way to match on a lambda due to tool limitations. This "match" applies to all methods in the class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good to know

<Method name="awaitOn"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
<Match>
<Class name="com.mongodb.internal.time.Timeout"/>
<Method name="awaitOn"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>
<Match>
<Class name="com.mongodb.internal.time.Timeout"/>
<Method name="awaitOn"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class MongoOperationTimeoutException extends MongoTimeoutException
* @param message the message
*/
public MongoOperationTimeoutException(final String message) {
// TODO (CSOT) - JAVA-5248 move all messages here as constants to ensure consistency?
super(message);
}

Expand Down
1 change: 1 addition & 0 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal;

import com.mongodb.MongoInterruptedException;
import com.mongodb.internal.function.CheckedSupplier;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down
122 changes: 66 additions & 56 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;
import org.bson.BsonDocument;
import org.bson.BsonInt64;

import java.util.Objects;
import java.util.Optional;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Timeout Context.
Expand All @@ -46,14 +48,18 @@ public class TimeoutContext {
private Timeout computedServerSelectionTimeout;
private long minRoundTripTimeMS = 0;

public static MongoOperationTimeoutException createMongoTimeoutException() {
return createMongoTimeoutException("Remaining timeoutMS is less than the servers minimum round trip time.");
public static MongoOperationTimeoutException createMongoRoundTripTimeoutException() {
return createMongoTimeoutException("Remaining timeoutMS is less than the server's minimum round trip time.");
}

public static MongoOperationTimeoutException createMongoTimeoutException(final String message) {
return new MongoOperationTimeoutException(message);
}

public static void throwMongoTimeoutException(final String message) {
throw new MongoOperationTimeoutException(message);
}

public static MongoOperationTimeoutException createMongoTimeoutException(final Throwable cause) {
return createMongoTimeoutException("Operation timed out: " + cause.getMessage(), cause);
}
Expand Down Expand Up @@ -125,8 +131,8 @@ public boolean hasTimeoutMS() {
* @return true if the timeout has been set and it has expired
*/
public boolean hasExpired() {
// Use timeout.remaining instead of timeout.hasExpired that measures in nanoseconds.
return timeout != null && !timeout.isInfinite() && timeout.remaining(MILLISECONDS) <= 0;
// TODO-CSOT remove this method (do not inline, but use lambdas)
return Timeout.nullAsInfinite(timeout).run(NANOSECONDS, () -> false, (ns) -> false, () -> true);
}

/**
Expand All @@ -140,19 +146,9 @@ public TimeoutContext minRoundTripTimeMS(final long minRoundTripTimeMS) {
return this;
}

public Optional<MongoOperationTimeoutException> validateHasTimedOutForCommandExecution() {
if (hasTimedOutForCommandExecution()) {
return Optional.of(createMongoTimeoutException());
}
return Optional.empty();
}

private boolean hasTimedOutForCommandExecution() {
if (timeout == null || timeout.isInfinite()) {
return false;
}
long remaining = timeout.remaining(MILLISECONDS);
return remaining <= 0 || minRoundTripTimeMS > remaining;
@Nullable
public Timeout timeoutIncludingRoundTrip() {
return timeout == null ? null : timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS);
}

/**
Expand All @@ -162,14 +158,13 @@ private boolean hasTimedOutForCommandExecution() {
* @return timeout to use.
*/
public long timeoutOrAlternative(final long alternativeTimeoutMS) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeoutOrAlternative method should ideally be inlined, with getMaxCommitTimeMS, getReadTimeoutMS, and getWriteTimeoutMS encapsulating the result using lambdas, as was done in runMaxTimeMSTimeout. This requires more associated refactoring than seems appropriate for this PR.

Long timeoutMS = timeoutSettings.getTimeoutMS();
if (timeoutMS == null) {
return alternativeTimeoutMS;
} else if (timeoutMS == 0) {
return timeoutMS;
} else {
return timeoutRemainingMS();
}
Timeout t = timeout != null ? timeout : Timeout.expiresIn(alternativeTimeoutMS, MILLISECONDS);
return t.run(MILLISECONDS,
() -> 0L,
(ms) -> ms,
() -> {
throw createMongoTimeoutException("The operation timeout has expired.");
});
}

/**
Expand All @@ -178,17 +173,17 @@ public long timeoutOrAlternative(final long alternativeTimeoutMS) {
* @param alternativeTimeoutMS the alternative timeout
* @return the minimum value to use.
*/
// TODO (CSOT) JAVA-5385 used only by tests?
public long calculateMin(final long alternativeTimeoutMS) {
Long timeoutMS = timeoutSettings.getTimeoutMS();
if (timeoutMS == null) {
return alternativeTimeoutMS;
} else if (timeoutMS == 0) {
return alternativeTimeoutMS;
} else if (alternativeTimeoutMS == 0) {
return timeoutRemainingMS();
} else {
return Math.min(timeoutRemainingMS(), alternativeTimeoutMS);
}
Timeout minimum = Timeout.expiresIn(alternativeTimeoutMS, MILLISECONDS)
.orEarlier(Timeout.nullAsInfinite(timeout));

return minimum.run(MILLISECONDS,
() -> 0L,
(ms) -> ms,
() -> {
throw createMongoTimeoutException("The operation timeout has expired.");
});
}

public TimeoutSettings getTimeoutSettings() {
Expand All @@ -199,15 +194,31 @@ public long getMaxAwaitTimeMS() {
return timeoutSettings.getMaxAwaitTimeMS();
}

// TODO (CSOT) JAVA-5385 used only by tests?
public long getMaxTimeMS() {
long maxTimeMS = timeoutOrAlternative(timeoutSettings.getMaxTimeMS());
if (timeout == null || timeout.isInfinite()) {
return maxTimeMS;
}
if (minRoundTripTimeMS >= maxTimeMS) {
throw createMongoTimeoutException();
}
return maxTimeMS - minRoundTripTimeMS;
return getMaxTimeTimeout().run(MILLISECONDS,
() -> 0L,
(ms) -> ms,
() -> {
throw createMongoRoundTripTimeoutException();
});
}

private Timeout getMaxTimeTimeout() {
Timeout t = timeout != null
? timeout
: Timeout.expiresIn(timeoutSettings.getMaxTimeMS(), MILLISECONDS);
t = t.shortenBy(minRoundTripTimeMS, MILLISECONDS);
return t;
}

public void putMaxTimeMS(final BsonDocument command) {
getMaxTimeTimeout().run(MILLISECONDS,
() -> {},
(ms) -> command.put("maxTimeMS", new BsonInt64(ms)),
() -> {
throw createMongoRoundTripTimeoutException();
});
}

public Long getMaxCommitTimeMS() {
Expand All @@ -223,8 +234,10 @@ public long getWriteTimeoutMS() {
return timeoutOrAlternative(0);
}

public int getConnectTimeoutMs() {
return (int) calculateMin(getTimeoutSettings().getConnectTimeoutMS());
public Timeout createConnectTimeoutMs() {
// null timeout treated as infinite will be later than the other
return Timeout.expiresIn(getTimeoutSettings().getConnectTimeoutMS(), MILLISECONDS)
.orEarlier(Timeout.nullAsInfinite(timeout));
}

public void resetTimeout() {
Expand All @@ -236,9 +249,13 @@ public void resetTimeout() {
* Resets the timeout if this timeout context is being used by pool maintenance
*/
public void resetMaintenanceTimeout() {
if (isMaintenanceContext && timeout != null && !timeout.isInfinite()) {
timeout = calculateTimeout(timeoutSettings.getTimeoutMS());
if (!isMaintenanceContext) {
return;
}
timeout = Timeout.nullAsInfinite(timeout).run(NANOSECONDS,
() -> timeout,
(ms) -> calculateTimeout(timeoutSettings.getTimeoutMS()),
() -> calculateTimeout(timeoutSettings.getTimeoutMS()));
}

public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout) {
Expand All @@ -254,14 +271,6 @@ public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout)
return new TimeoutContext(timeoutSettings.withReadTimeoutMS(newReadTimeout > 0 ? newReadTimeout : Long.MAX_VALUE));
}

private long timeoutRemainingMS() {
assertNotNull(timeout);
if (timeout.hasExpired()) {
throw createMongoTimeoutException("The operation timeout has expired.");
}
return timeout.isInfinite() ? 0 : timeout.remaining(MILLISECONDS);
}

@Override
public String toString() {
return "TimeoutContext{"
Expand Down Expand Up @@ -294,6 +303,7 @@ public int hashCode() {

@Nullable
public static Timeout calculateTimeout(@Nullable final Long timeoutMS) {
// TODO-CSOT rename to startTimeout?
if (timeoutMS != null) {
return timeoutMS == 0 ? Timeout.infinite() : Timeout.expiresIn(timeoutMS, MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

/**
* This package contains cluster and connection event related classes
*/

@NonNullApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

/**
* This package contains cluster and connection event related classes
*/

@NonNullApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, computedServerSelectionTimeout);

if (!currentDescription.isCompatibleWithDriver()) {
throw createAndLogIncompatibleException(operationContext.getId(), serverSelector, currentDescription);
logAndThrowIncompatibleException(operationContext.getId(), serverSelector, currentDescription);
}
if (serverTuple != null) {
logServerSelectionSucceeded(clusterId, operationContext.getId(), serverTuple.getServerDescription().getAddress(),
serverSelector, currentDescription);
return serverTuple;
}
if (computedServerSelectionTimeout.hasExpired()) {
throw createAndLogTimeoutException(operationContext.getId(), serverSelector, currentDescription);
}
computedServerSelectionTimeout.ifExpired(() -> {
logAndThrowTimeoutException(operationContext.getId(), serverSelector, currentDescription);
});
if (!selectionWaitingLogged) {
logServerSelectionWaiting(clusterId, operationContext.getId(), computedServerSelectionTimeout, serverSelector, currentDescription);
selectionWaitingLogged = true;
Expand Down Expand Up @@ -249,8 +249,7 @@ private boolean handleServerSelectionRequest(
CountDownLatch prevPhase = request.phase;
request.phase = currentPhase;
if (!description.isCompatibleWithDriver()) {
request.onResult(null, createAndLogIncompatibleException(request.getOperationId(), request.originalSelector, description));
return true;
logAndThrowIncompatibleException(request.getOperationId(), request.originalSelector, description);
}

ServerTuple serverTuple = selectServer(request.compositeSelector, description, request.getTimeout());
Expand All @@ -266,12 +265,9 @@ private boolean handleServerSelectionRequest(
}
}

if (request.getTimeout().hasExpired()) {
request.onResult(null, createAndLogTimeoutException(request.getOperationId(),
request.originalSelector, description));
return true;
}

Timeout.ifExistsAndExpired(request.getTimeout(), () -> {
logAndThrowTimeoutException(request.getOperationId(), request.originalSelector, description);
});
return false;
} catch (Exception e) {
request.onResult(null, e);
Expand Down Expand Up @@ -338,13 +334,13 @@ protected ClusterableServer createServer(final ServerAddress serverAddress) {
return serverFactory.create(this, serverAddress);
}

private MongoIncompatibleDriverException createAndLogIncompatibleException(
private void logAndThrowIncompatibleException(
final long operationId,
final ServerSelector serverSelector,
final ClusterDescription clusterDescription) {
MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription);
logServerSelectionFailed(clusterId, operationId, exception, serverSelector, clusterDescription);
return exception;
throw exception;
}

private MongoIncompatibleDriverException createIncompatibleException(final ClusterDescription curDescription) {
Expand All @@ -366,15 +362,15 @@ private MongoIncompatibleDriverException createIncompatibleException(final Clust
return new MongoIncompatibleDriverException(message, curDescription);
}

private MongoException createAndLogTimeoutException(
private void logAndThrowTimeoutException(
final long operationId,
final ServerSelector serverSelector,
final ClusterDescription clusterDescription) {
MongoTimeoutException exception = new MongoTimeoutException(format(
"Timed out while waiting for a server that matches %s. Client view of cluster state is %s",
serverSelector, clusterDescription.getShortDescription()));
logServerSelectionFailed(clusterId, operationId, exception, serverSelector, clusterDescription);
return exception;
throw exception;
}

private static final class ServerSelectionRequest {
Expand Down Expand Up @@ -448,20 +444,21 @@ public void run() {
ClusterDescription curDescription = description;

Timeout timeout = Timeout.infinite();

boolean someWaitersNotSatisfied = false;
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
ServerSelectionRequest currentRequest = iter.next();
if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) {
iter.remove();
} else {
someWaitersNotSatisfied = true;
timeout = timeout
.orEarlier(currentRequest.getTimeout())
.orEarlier(startMinWaitHeartbeatTimeout());
}
}

// if there are any waiters that were not satisfied, connect
if (!timeout.isInfinite()) {
// TODO-CSOT because min heartbeat cannot be infinite, infinite is being used to mark the second branch
if (someWaitersNotSatisfied) {
connect();
}

Expand Down Expand Up @@ -508,7 +505,8 @@ private static void logServerSelectionWaiting(
asList(
new Entry(OPERATION, null),
new Entry(OPERATION_ID, operationId),
new Entry(REMAINING_TIME_MS, timeout.remainingOrNegativeForInfinite(MILLISECONDS)),
new Entry(REMAINING_TIME_MS, timeout.run(MILLISECONDS, () -> "infinite", (ms) -> ms, () -> 0L)),
// TODO-CSOT the above extracts values, but the alternative seems worse
new Entry(SELECTOR, serverSelector.toString()),
new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())),
"Waiting for server to become available for operation[ {}] with ID {}.[ Remaining time: {} ms.]"
Expand Down
Loading