Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ public boolean hasTimeoutMS() {
}

/**
* Checks the expiry of the timeout.
*
* @return true if the timeout has been set and it has expired
* Runs the runnable if the timeout is expired.
* @param onExpired the runnable to run
*/
public boolean hasExpired() {
// TODO (CSOT) this method leaks Timeout internals, should be removed (not inlined, but inverted using lambdas)
return Timeout.nullAsInfinite(timeout).call(NANOSECONDS, () -> false, (ns) -> false, () -> true);
public void onExpired(final Runnable onExpired) {
Timeout.nullAsInfinite(timeout).onExpired(onExpired);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public final class RetryState {

private final LoopState loopState;
private final int attempts;
@Nullable
private final TimeoutContext timeoutContext;
private final boolean retryUntilTimeoutThrowsException;
@Nullable
private Throwable previouslyChosenException;

Expand All @@ -63,7 +62,7 @@ public final class RetryState {
* If a timeout is not specified in the {@link TimeoutContext#hasTimeoutMS()}, the specified {@code retries} param acts as a fallback
* bound. Otherwise, retries are unbounded until the timeout is reached.
* <p>
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow(Throwable, BiFunction, BiPredicate, boolean)} method,
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
* which can be used to stop retrying based on a custom condition additionally to {@code retires} and {@link TimeoutContext}.
* </p>
*
Expand All @@ -87,7 +86,7 @@ public static RetryState withNonRetryableState() {
* Creates a {@link RetryState} that does not limit the number of retries.
* The number of attempts is limited iff {@link TimeoutContext#hasTimeoutMS()} is true and timeout has expired.
* <p>
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow(Throwable, BiFunction, BiPredicate, boolean)} method,
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
* which can be used to stop retrying based on a custom condition additionally to {@code retires} and {@link TimeoutContext}.
* </p>
*
Expand All @@ -107,7 +106,7 @@ private RetryState(final int retries, @Nullable final TimeoutContext timeoutCont
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
this.timeoutContext = timeoutContext;
this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
}

/**
Expand Down Expand Up @@ -198,16 +197,16 @@ private void doAdvanceOrThrow(final Throwable attemptException,
* A MongoOperationTimeoutException indicates that the operation timed out, either during command execution or server selection.
* The timeout for server selection is determined by the computedServerSelectionMS = min(serverSelectionTimeoutMS, timeoutMS).
*
* The isLastAttempt() method checks if the timeoutMS has expired, which could be greater than the computedServerSelectionMS.
* Therefore, it's important to check if the exception is an instance of MongoOperationTimeoutException to detect a timeout.
* It is important to check if the exception is an instance of MongoOperationTimeoutException to detect a timeout.
*/
if (isLastAttempt() || attemptException instanceof MongoOperationTimeoutException) {
previouslyChosenException = newlyChosenException;
/*
* The function of isLastIteration() is to indicate if retrying has been explicitly halted. Such a stop is not interpreted as
* The function of isLastIteration() is to indicate if retrying has
* been explicitly halted. Such a stop is not interpreted as
* a timeout exception but as a deliberate cessation of retry attempts.
*/
if (hasTimeoutMs() && !loopState.isLastIteration()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for removing these is that it is safe to behave as if a timeout has not expired (even if it has), because it will be detected and thrown by an ensuing blocking operation.

This relies on the assumption that all blocking operations will throw when given an expired timeout (this should be the case, because all blocking operations should now be wrapped in Timeout's branching "call" methods).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests were failing. The logic related to timeout exceptions in RetryState is necessary, but complicated to extract. I've restored it, but the isExpired check is still removed (so this still relies on the ensuing blocking operation throwing a timeout exception).

if (retryUntilTimeoutThrowsException && !loopState.isLastIteration()) {
previouslyChosenException = createMongoTimeoutException(
"Retry attempt exceeded the timeout limit.",
previouslyChosenException);
Expand Down Expand Up @@ -381,16 +380,12 @@ public boolean isLastAttempt() {
if (loopState.isLastIteration()){
return true;
}
if (hasTimeoutMs()) {
return assertNotNull(timeoutContext).hasExpired();
if (retryUntilTimeoutThrowsException) {
return false;
}
return attempt() == attempts - 1;
}

private boolean hasTimeoutMs() {
return timeoutContext != null && timeoutContext.hasTimeoutMS();
}

/**
* A 0-based attempt number.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,24 +771,22 @@ private void updateSessionContext(final SessionContext sessionContext, final Res
}

private void throwTranslatedWriteException(final Throwable e, final OperationContext operationContext) {
throw translateWriteException(e, operationContext);
}

private MongoException translateWriteException(final Throwable e, final OperationContext operationContext) {
if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasExpired()) {
return createMongoTimeoutException(e);
if (e instanceof MongoSocketWriteTimeoutException) {
operationContext.getTimeoutContext().onExpired(() -> {
throw createMongoTimeoutException(e);
});
}

if (e instanceof MongoException) {
return (MongoException) e;
throw (MongoException) e;
}
Optional<MongoInterruptedException> interruptedException = translateInterruptedException(e, "Interrupted while sending message");
if (interruptedException.isPresent()) {
return interruptedException.get();
throw interruptedException.get();
} else if (e instanceof IOException) {
return new MongoSocketWriteException("Exception sending message", getServerAddress(), e);
throw new MongoSocketWriteException("Exception sending message", getServerAddress(), e);
} else {
return new MongoInternalException("Unexpected exception", e);
throw new MongoInternalException("Unexpected exception", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ public ByteBuf getBuffer(final int size) {
public void write(final List<ByteBuf> buffers, final OperationContext operationContext) throws IOException {
for (final ByteBuf cur : buffers) {
outputStream.write(cur.array(), 0, cur.limit());
if (operationContext.getTimeoutContext().hasExpired()) {
operationContext.getTimeoutContext().onExpired(() -> {
throwMongoTimeoutException("Socket write exceeded the timeout limit.");
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set<S
public static final class BulkWriteTracker {
private int attempt;
private final int attempts;
private final TimeoutContext timeoutContext;
private final boolean retryUntilTimeoutThrowsException;
@Nullable
private final BulkWriteBatch batch;

Expand Down Expand Up @@ -475,12 +475,12 @@ private BulkWriteTracker(final boolean retry, @Nullable final BulkWriteBatch bat
attempt = 0;
attempts = retry ? RetryState.RETRIES + 1 : 1;
this.batch = batch;
this.timeoutContext = timeoutContext;
this.retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS();;
}

boolean lastAttempt() {
if (timeoutContext.hasTimeoutMS()){
return timeoutContext.hasExpired();
if (retryUntilTimeoutThrowsException){
return false;
}
return attempt == attempts - 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.mongodb.internal;

import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -36,6 +38,7 @@
import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_MAX_TIME_AND_AWAIT_TIME;
import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_TIMEOUT;
import static com.mongodb.ClusterFixture.sleep;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -182,9 +185,13 @@ void testExpired() {
new TimeoutContext(TIMEOUT_SETTINGS.withTimeoutMS(9999999L));
TimeoutContext noTimeout = new TimeoutContext(TIMEOUT_SETTINGS);
sleep(100);
assertFalse(noTimeout.hasExpired());
assertFalse(longTimeout.hasExpired());
assertTrue(smallTimeout.hasExpired());
assertFalse(hasExpired(noTimeout.getTimeout()));
assertFalse(hasExpired(longTimeout.getTimeout()));
assertTrue(hasExpired(smallTimeout.getTimeout()));
}

private static boolean hasExpired(@Nullable final Timeout timeout) {
return Timeout.nullAsInfinite(timeout).call(NANOSECONDS, () -> false, (ns) -> false, () -> true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.function.LoopState.AttachmentKey;
import com.mongodb.internal.operation.retry.AttachmentKeys;
import org.junit.Ignore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -40,7 +41,6 @@
import static org.junit.jupiter.api.Named.named;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

final class RetryStateTest {
private static final TimeoutContext TIMEOUT_CONTEXT_NO_GLOBAL_TIMEOUT = new TimeoutContext(new TimeoutSettings(0L, 0L,
Expand Down Expand Up @@ -97,17 +97,6 @@ void unlimitedAttemptsAndAdvance(final TimeoutContext timeoutContext) {
);
}

@Test
void unlimitedAttemptsAndAdvanceWithTimeoutException() {
RetryState retryState = new RetryState(TIMEOUT_CONTEXT_EXPIRED_GLOBAL_TIMEOUT);
assertAll(
() -> assertTrue(retryState.isFirstAttempt()),
() -> assertEquals(0, retryState.attempt()),
() -> assertEquals(0, retryState.attempts())
);
Assertions.assertThrows(MongoOperationTimeoutException.class, () -> advance(retryState));
}

@Test
void limitedAttemptsAndAdvance() {
RetryState retryState = RetryState.withNonRetryableState();
Expand Down Expand Up @@ -169,14 +158,6 @@ void breakAndThrowIfRetryAndFalse(final TimeoutContext timeoutContext) {
assertFalse(retryState.isLastAttempt());
}

@Test
void breakAndThrowIfRetryAndFalseWithExpiredTimeout() {
RetryState retryState = new RetryState(TIMEOUT_CONTEXT_EXPIRED_GLOBAL_TIMEOUT);
retryState.breakAndThrowIfRetryAnd(() -> false);
assertTrue(retryState.isLastAttempt());
assertThrows(MongoOperationTimeoutException.class, () -> advance(retryState));
}

@ParameterizedTest
@MethodSource({"infiniteTimeout", "noTimeout"})
void breakAndThrowIfRetryAndTrue() {
Expand All @@ -189,10 +170,6 @@ void breakAndThrowIfRetryAndTrue() {
@Test
void breakAndThrowIfRetryAndTrueWithExpiredTimeout() {
TimeoutContext tContextMock = mock(TimeoutContext.class);
when(tContextMock.hasTimeoutMS()).thenReturn(true);
when(tContextMock.hasExpired())
.thenReturn(false)
.thenReturn(true);

RetryState retryState = new RetryState(tContextMock);
advance(retryState);
Expand Down Expand Up @@ -354,6 +331,7 @@ void advanceOrThrowPredicateThrowsAfterFirstAttempt(final TimeoutContext timeout
}));
}

@Ignore // TODO (CSOT) update this
@Test
void advanceOrThrowPredicateThrowsTimeoutAfterFirstAttempt() {
RetryState retryState = new RetryState(TIMEOUT_CONTEXT_EXPIRED_GLOBAL_TIMEOUT);
Expand Down Expand Up @@ -439,6 +417,7 @@ void advanceOrThrowTransformAfterFirstAttempt(final TimeoutContext timeoutContex
}));
}

@Ignore // TODO (CSOT) update this
@Test
void advanceOrThrowTransformThrowsTimeoutExceptionAfterFirstAttempt() {
Copy link
Member

@vbabanin vbabanin Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts the behavior of error transformation along with timeout handling. Despite the removal of timeout checks within the RetryState, we should ensure that the logic related to error transformation is still tested.

The same applies to advanceOrThrowPredicateThrowsTimeoutAfterFirstAttempt.

Copy link
Member

@vbabanin vbabanin Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on the call with @katcharov, those tests will be ignored and updated later.

RetryState retryState = new RetryState(TIMEOUT_CONTEXT_EXPIRED_GLOBAL_TIMEOUT);
Expand Down