Skip to content

Commit

Permalink
fix: Unary Callables Deadline values respect the TotalTimeout in Retr…
Browse files Browse the repository at this point in the history
…ySettings (#1603)

* chore: Add retry test

* chore: Check the timeout for unary callables

* chore: Fix tests

* chore: Address code smell

* chore: Add tests for DEADLINE_EXCEEDED

* chore: Add tests for Server-side streaming

* chore: Start the timeout after http request invocation

* chore: Fix format issues

* chore: Remove Instant calculation with System clock

* chore: Add ITRetry test cases

* chore: Fix the Retry showcase test

* chore: Fix the Retry showcase test

* chore: Convert duration to nanos

* chore: Set the readTimeout min to 20s

* chore: Add sucessful retry tests cases

* chore: Add comments to timeout

* chore: Update the connect timeout

* chore: Refactor timeout logic

* chore: Fix format issues

* chore: Add logic for deadlineScheduler

* chore: Fix format issues

* chore: Update logic

* chore: Update comment

* chore: Update showcase test

* chore: Fix format issues

* chore: Fix logic

* chore: Do not disconnect the connection

* chore: Disconnect after end

* chore: Resolve steam close error

* chore: Fix disconnect logic

* chore: Fix disconnect logic

* chore: Update CI

* chore: Fix native test

* chore: Revert changes

* chore: try with rpc timeout 100ms

* chore: Fix format issues

* chore: Re-run delivery loop with deadlineschedule priority

* chore: Check for timeoutExceeded

* chore: Do not send message is time exceeded

* chore: Fix format issues

* chore: Add timeout for tests

* chore: Fix format issues

* chore: Refactor trailer logic

* chore: Refactor trailer logic

* chore: Rename variables

* chore: Increase the wait to 1s

* chore: Fix format issues

* chore: Set closed var as volatile

* chore: Update logic for onClose

* chore: Attempt with longer timeout

* chore: Empty commit

* chore: Fix format issues

* chore: Trigger deliver loop instead of notifyListeners

* chore: Remove variable

* chore: Remove variable

* chore: Fix close logic

* chore: Revert graalvm ci

* chore: Use 2s as delay

* chore: Update to 5s delay

* chore: Add comments for timeout method

* chore: Use deliver loop in timeout

* chore: Run matrix jobs sequentially

* chore: Fix format issues

* chore: Fix format issues

* chore: Increase the wait to 10s

* chore: Use 110ms delay

* chore: Set delay to be 30s

* chore: Fix format issues

* chore: Log the onClose message

* chore: Remove localRunnable

* chore: Fix format issues

* chore: Lower the retry amounts

* chore: Lower the retry amounts

* chore: Fix shouldRetry logic

* chore: Log results of shouldRetry

* chore: Ignore other retry tests

* chore: Add more logging

* chore: Fix shouldRetry logic

* chore: Remove small optimization

* chore: Temp ignore tests

* chore: Temp ignore tests

* chore: Add more logging

* chore: revert back to checking for negative duration

* chore: Revert ignored test

* chore: Fix logging

* chore: Log timeout

* chore: Set min RPC timeout to be 1ms

* chore: Update the retry algorithms

* chore: Clean up the algoritms

* chore: Uncomment out ITRetry tests

* chore: Refactor the retryAlgorithms

* chore: Add more comments

* chore: Add in the parallel execution for ITs

* chore: Add LRO showcase tests

* chore: Fix format

* chore: Remove deadline getters

* chore: Remove sonar changes

* chore: Fix algorithm test

* chore: Log the flaky test

* chore: Fix format

* chore: Check for rpcTimeout being zero or negative

* chore: Fix tests

* chore: Fix format issues

* chore: Remove unused code

* chore: Update comment for RetryAlgorithm

* chore: Fix format issues

* chore: Use millis for timeout

* chore: Await termination for clients

* chore: Fix format issues

* chore: Update LRO first call timeout value

* chore: Update LRO test names

* chore: Remove the parallel showcase tests

* chore: Add showcase sequence tests for retries

* chore: Add showcase sequence tests for retries

* chore: Update retry test name

* chore: Fix typos

* chore: Fix server streaming callable test

* chore: Clean up tests

* chore: Remove retry tests

* chore: Fix format issues

* chore: Address PR comments

* chore: Update java.time.Duration import

* chore: Update values for LRO showcase test

* chore: Update values for LRO showcase test

* chore: Fix format issues

* chore: Update variable name

* chore: Convert shouldRetry logic to use milliseconds

* chore: Remove jitter from tests

* chore: Fix showcase test

* chore: Log the attempt callable timeout

* chore: Update LRO test case

* chore: Add logging

* chore: Use millis for timeout

* chore: Address PR comments

* chore: Update to use TestClientInitializer class

* chore: Fix client initialize method names

* chore: Add back public method

* chore: Add back public methods
  • Loading branch information
lqiu96 authored May 8, 2023
1 parent 6107ff3 commit d2fe520
Show file tree
Hide file tree
Showing 17 changed files with 958 additions and 68 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/sonar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,4 @@ jobs:
-Dsonar.projectKey=googleapis_gapic-generator-java_unit_tests \
-Dsonar.organization=googleapis \
-Dsonar.host.url=https://sonarcloud.io \
-Dsonar.projectName=java_showcase_unit_tests
-Dsonar.projectName=java_showcase_unit_tests
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.protobuf.TypeRegistry;
import java.time.Duration;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

Expand All @@ -42,6 +43,9 @@
public abstract class HttpJsonCallOptions {
public static final HttpJsonCallOptions DEFAULT = newBuilder().build();

@Nullable
public abstract Duration getTimeout();

@Nullable
public abstract Instant getDeadline();

Expand Down Expand Up @@ -69,6 +73,13 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {
builder.setDeadline(newDeadline);
}

if (inputOptions.getTimeout() != null) {
Duration newTimeout = java.time.Duration.ofMillis(inputOptions.getTimeout().toMillis());
if (newTimeout != null) {
builder.setTimeout(newTimeout);
}
}

Credentials newCredentials = inputOptions.getCredentials();
if (newCredentials != null) {
builder.setCredentials(newCredentials);
Expand All @@ -84,6 +95,8 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTimeout(java.time.Duration value);

public abstract Builder setDeadline(Instant value);

public abstract Builder setCredentials(Credentials value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType;
import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener;
import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult;
import com.google.api.gax.rpc.StatusCode;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -88,6 +92,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
private final ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor;
private final HttpTransport httpTransport;
private final Executor executor;
private final ScheduledExecutorService deadlineCancellationExecutor;

//
// Request-specific data (provided by client code) before we get a response.
Expand All @@ -114,19 +119,21 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
private ProtoMessageJsonStreamIterator responseStreamIterator;

@GuardedBy("lock")
private boolean closed;
private volatile boolean closed;

HttpJsonClientCallImpl(
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor,
String endpoint,
HttpJsonCallOptions callOptions,
HttpTransport httpTransport,
Executor executor) {
Executor executor,
ScheduledExecutorService deadlineCancellationExecutor) {
this.methodDescriptor = methodDescriptor;
this.endpoint = endpoint;
this.callOptions = callOptions;
this.httpTransport = httpTransport;
this.executor = executor;
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.closed = false;
}

Expand Down Expand Up @@ -161,6 +168,38 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
this.listener = responseListener;
this.requestHeaders = requestHeaders;
}

// Use the timeout duration value instead of calculating the future Instant
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
Duration timeout = callOptions.getTimeout();
if (timeout != null) {
// The future timeout value is guaranteed to not be a negative value as the
// RetryAlgorithm will not retry
long timeoutMs = timeout.toMillis();
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
}
}

// Notify the FutureListener that the there is a timeout exception from this RPC
// call (DEADLINE_EXCEEDED). For retrying RPCs, this code is returned for every attempt
// that exceeds the timeout. The RetryAlgorithm will check both the timing and code to
// ensure another attempt is made.
private void timeout() {
// There is a race between the deadline scheduler and response being returned from
// the server. The deadline scheduler has priority as it will clear out the pending
// notifications queue and add the DEADLINE_EXCEEDED event once it is able to obtain
// the lock.
synchronized (lock) {
close(
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(),
"Deadline exceeded",
new HttpJsonStatusRuntimeException(
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), "Deadline exceeded", null),
true);
}

// trigger delivery loop if not already running
deliver();
}

@Override
Expand Down Expand Up @@ -260,9 +299,10 @@ private void deliver() {
throw new InterruptedException("Message delivery has been interrupted");
}

// All listeners must be called under delivery loop (but outside the lock) to ensure that no
// two notifications come simultaneously from two different threads and that we do not go
// indefinitely deep in the stack if delivery logic is called recursively via listeners.
// All listeners must be called under delivery loop (but outside the lock) to ensure that
// no two notifications come simultaneously from two different threads and that we do not
// go indefinitely deep in the stack if delivery logic is called recursively via
// listeners.
notifyListeners();

// The synchronized block around message reading and cancellation notification processing
Expand Down Expand Up @@ -302,7 +342,7 @@ private void deliver() {
inDelivery = false;
break;
} else {
// We still have some stuff in notiticationTasksQueue so continue the loop, most
// We still have some stuff in notificationTasksQueue so continue the loop, most
// likely we will finally terminate on the next cycle.
continue;
}
Expand All @@ -319,8 +359,8 @@ private void deliver() {
// can do in such an unlikely situation (otherwise we would stay forever in the delivery
// loop).
synchronized (lock) {
// Close the call immediately marking it cancelled. If already closed close() will have no
// effect.
// Close the call immediately marking it cancelled. If already closed, close() will have
// no effect.
close(ex.getStatusCode(), ex.getMessage(), ex, true);
}
}
Expand Down Expand Up @@ -352,7 +392,7 @@ private boolean consumeMessageFromStream() throws IOException {
boolean allMessagesConsumed;
Reader responseReader;
if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) {
// Lazily initialize responseStreamIterator in case if it is a server steraming response
// Lazily initialize responseStreamIterator in case if it is a server streaming response
if (responseStreamIterator == null) {
responseStreamIterator =
new ProtoMessageJsonStreamIterator(
Expand Down Expand Up @@ -384,7 +424,7 @@ private boolean consumeMessageFromStream() throws IOException {

@GuardedBy("lock")
private void close(
int statusCode, String message, Throwable cause, boolean terminateImmediatelly) {
int statusCode, String message, Throwable cause, boolean terminateImmediately) {
try {
if (closed) {
return;
Expand All @@ -399,12 +439,12 @@ private void close(
requestRunnable = null;
}

HttpJsonMetadata.Builder meatadaBuilder = HttpJsonMetadata.newBuilder();
HttpJsonMetadata.Builder metadataBuilder = HttpJsonMetadata.newBuilder();
if (runnableResult != null && runnableResult.getTrailers() != null) {
meatadaBuilder = runnableResult.getTrailers().toBuilder();
metadataBuilder = runnableResult.getTrailers().toBuilder();
}
meatadaBuilder.setException(cause);
meatadaBuilder.setStatusMessage(message);
metadataBuilder.setException(cause);
metadataBuilder.setStatusMessage(message);
if (responseStreamIterator != null) {
responseStreamIterator.close();
}
Expand All @@ -415,19 +455,19 @@ private void close(
// onClose() suppresses all other pending notifications.
// there should be no place in the code which inserts something in this queue before checking
// the `closed` flag under the lock and refusing to insert anything if `closed == true`.
if (terminateImmediatelly) {
if (terminateImmediately) {
// This usually means we are cancelling the call before processing the response in full.
// It may happen if a user explicitly cancels the call or in response to an unexpected
// exception either from server or a call listener execution.
pendingNotifications.clear();
}

pendingNotifications.offer(
new OnCloseNotificationTask<>(listener, statusCode, meatadaBuilder.build()));
new OnCloseNotificationTask<>(listener, statusCode, metadataBuilder.build()));

} catch (Throwable e) {
// suppress stream closing exceptions in favor of the actual call closing cause. This method
// should not throw, otherwise we may stuck in an infinite loop of exception processing.
// should not throw, otherwise we may be stuck in an infinite loop of exception processing.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;
import org.threeten.bp.Duration;

/**
* {@code HttpJsonClientCalls} creates a new {@code HttpJsonClientCAll} from the given call context.
Expand All @@ -50,12 +49,25 @@ public static <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newC

HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault().nullToSelf(context);

// Try to convert the timeout into a deadline and use it if it occurs before the actual deadline
// Use the context's timeout instead of calculating a future deadline with the System clock.
// The timeout value is calculated from TimedAttemptSettings which accounts for the
// TotalTimeout value set in the RetrySettings.
if (httpJsonContext.getTimeout() != null) {
@Nonnull Instant newDeadline = Instant.now().plus(httpJsonContext.getTimeout());
HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions();
if (callOptions.getDeadline() == null || newDeadline.isBefore(callOptions.getDeadline())) {
callOptions = callOptions.toBuilder().setDeadline(newDeadline).build();
// HttpJsonChannel expects the HttpJsonCallOptions and we store the timeout duration
// inside the HttpJsonCallOptions
// Note: There is manual conversion between threetenbp's Duration and java.util.Duration
// This is temporary here as we plan to migrate to java.util.Duration
if (callOptions.getTimeout() == null
|| httpJsonContext
.getTimeout()
.compareTo(Duration.ofMillis(callOptions.getTimeout().toMillis()))
< 0) {
callOptions =
callOptions
.toBuilder()
.setTimeout(java.time.Duration.ofMillis(httpJsonContext.getTimeout().toMillis()))
.build();
httpJsonContext = httpJsonContext.withCallOptions(callOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/** A runnable object that creates and executes an HTTP request. */
class HttpRequestRunnable<RequestT, ResponseT> implements Runnable {
Expand Down Expand Up @@ -100,24 +99,22 @@ void cancel() {

@Override
public void run() {
HttpResponse httpResponse = null;
RunnableResult.Builder result = RunnableResult.builder();
HttpJsonMetadata.Builder trailers = HttpJsonMetadata.newBuilder();
HttpRequest httpRequest = null;
HttpResponse httpResponse = null;
try {
// Check if already cancelled before even creating a request
if (cancelled) {
return;
}
httpRequest = createHttpRequest();
HttpRequest httpRequest = createHttpRequest();
// Check if already cancelled before sending the request;
if (cancelled) {
return;
}

httpResponse = httpRequest.execute();

// Check if already cancelled before sending the request;
// Check if already cancelled before trying to construct and read the response
if (cancelled) {
httpResponse.disconnect();
return;
Expand Down Expand Up @@ -145,6 +142,9 @@ public void run() {
}
trailers.setException(e);
} finally {
// If cancelled, `close()` in HttpJsonClientCallImpl has already been invoked
// and returned a DEADLINE_EXCEEDED error back so there is no need to set
// a result back.
if (!cancelled) {
resultListener.setResult(result.setTrailers(trailers.build()).build());
}
Expand Down Expand Up @@ -191,16 +191,6 @@ HttpRequest createHttpRequest() throws IOException {

HttpRequest httpRequest = buildRequest(requestFactory, url, jsonHttpContent);

Instant deadline = httpJsonCallOptions.getDeadline();
if (deadline != null) {
long readTimeout = Duration.between(Instant.now(), deadline).toMillis();
if (httpRequest.getReadTimeout() > 0
&& httpRequest.getReadTimeout() < readTimeout
&& readTimeout < Integer.MAX_VALUE) {
httpRequest.setReadTimeout((int) readTimeout);
}
}

for (Map.Entry<String, Object> entry : headers.getHeaders().entrySet()) {
HttpHeadersUtils.setHeader(
httpRequest.getHeaders(), entry.getKey(), (String) entry.getValue());
Expand Down Expand Up @@ -243,9 +233,35 @@ private HttpRequest buildRequest(
HttpHeadersUtils.setHeader(
httpRequest.getHeaders(), "X-HTTP-Method-Override", originalHttpMethod);
}

Duration timeout = httpJsonCallOptions.getTimeout();
if (timeout != null) {
long timeoutMs = timeout.toMillis();

// Read timeout is the timeout between reading two data packets and not total timeout
// HttpJsonClientCallsImpl implements a deadlineCancellationExecutor to cancel the
// RPC when it exceeds the RPC timeout
if (shouldUpdateTimeout(httpRequest.getReadTimeout(), timeoutMs)) {
httpRequest.setReadTimeout((int) timeoutMs);
}

// Connect timeout is the time allowed for establishing the connection.
// This is updated to match the RPC timeout as we do not want a shorter
// connect timeout to preemptively throw a ConnectExcepetion before
// we've reached the RPC timeout
if (shouldUpdateTimeout(httpRequest.getConnectTimeout(), timeoutMs)) {
httpRequest.setConnectTimeout((int) timeoutMs);
}
}
return httpRequest;
}

private boolean shouldUpdateTimeout(int currentTimeoutMs, long newTimeoutMs) {
return currentTimeoutMs > 0
&& currentTimeoutMs < newTimeoutMs
&& newTimeoutMs < Integer.MAX_VALUE;
}

// This will be frequently executed, so avoiding using regexps if not necessary.
private String normalizeEndpoint(String rawEndpoint) {
String normalized = rawEndpoint;
Expand Down
Loading

0 comments on commit d2fe520

Please sign in to comment.