Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Unary Callables Deadline values respect the TotalTimeout in RetrySettings #1603

Merged
merged 143 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
143 commits
Select commit Hold shift + click to select a range
dca83d7
chore: Add retry test
lqiu96 Mar 31, 2023
45c0765
chore: Check the timeout for unary callables
lqiu96 Apr 3, 2023
7862762
chore: Fix tests
lqiu96 Apr 4, 2023
7f5622f
chore: Address code smell
lqiu96 Apr 4, 2023
94b5017
chore: Add tests for DEADLINE_EXCEEDED
lqiu96 Apr 4, 2023
608d44b
chore: Add tests for Server-side streaming
lqiu96 Apr 4, 2023
0c1782f
chore: Start the timeout after http request invocation
lqiu96 Apr 4, 2023
f75f67e
chore: Fix format issues
lqiu96 Apr 4, 2023
7c4c841
chore: Remove Instant calculation with System clock
lqiu96 Apr 5, 2023
3668093
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 5, 2023
d440b8e
chore: Add ITRetry test cases
lqiu96 Apr 5, 2023
bbf7f4f
chore: Fix the Retry showcase test
lqiu96 Apr 6, 2023
bee8c0e
chore: Fix the Retry showcase test
lqiu96 Apr 6, 2023
73882a7
chore: Convert duration to nanos
lqiu96 Apr 6, 2023
b0d83b9
chore: Set the readTimeout min to 20s
lqiu96 Apr 6, 2023
fc5cf13
chore: Add sucessful retry tests cases
lqiu96 Apr 6, 2023
2f1bc8a
chore: Add comments to timeout
lqiu96 Apr 6, 2023
ead0eb9
chore: Update the connect timeout
lqiu96 Apr 6, 2023
ca0770b
chore: Refactor timeout logic
lqiu96 Apr 7, 2023
21ed291
chore: Fix format issues
lqiu96 Apr 10, 2023
bb39513
chore: Add logic for deadlineScheduler
lqiu96 Apr 13, 2023
fab86be
chore: Fix format issues
lqiu96 Apr 13, 2023
95ef1a1
chore: Update logic
lqiu96 Apr 14, 2023
25190ec
chore: Update comment
lqiu96 Apr 14, 2023
f59de69
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 14, 2023
133af94
chore: Update showcase test
lqiu96 Apr 14, 2023
ff5108f
chore: Fix format issues
lqiu96 Apr 14, 2023
e800373
chore: Fix logic
lqiu96 Apr 14, 2023
d757c05
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 14, 2023
f139797
chore: Do not disconnect the connection
lqiu96 Apr 14, 2023
e1d12c5
chore: Disconnect after end
lqiu96 Apr 14, 2023
8ee7513
chore: Resolve steam close error
lqiu96 Apr 14, 2023
bed960e
chore: Fix disconnect logic
lqiu96 Apr 14, 2023
5c5eaec
chore: Fix disconnect logic
lqiu96 Apr 14, 2023
ab93117
chore: Update CI
lqiu96 Apr 14, 2023
f3d19b6
chore: Fix native test
lqiu96 Apr 14, 2023
1e38e5f
chore: Revert changes
lqiu96 Apr 17, 2023
5a10f4c
chore: try with rpc timeout 100ms
lqiu96 Apr 17, 2023
9b877ac
chore: Fix format issues
lqiu96 Apr 17, 2023
eff3513
chore: Re-run delivery loop with deadlineschedule priority
lqiu96 Apr 17, 2023
1d1dffa
chore: Check for timeoutExceeded
lqiu96 Apr 17, 2023
887f3bb
chore: Do not send message is time exceeded
lqiu96 Apr 17, 2023
102ab99
chore: Fix format issues
lqiu96 Apr 17, 2023
3f4b9a5
chore: Add timeout for tests
lqiu96 Apr 17, 2023
dffe194
chore: Fix format issues
lqiu96 Apr 17, 2023
47c8da6
chore: Refactor trailer logic
lqiu96 Apr 17, 2023
e5ccddd
chore: Refactor trailer logic
lqiu96 Apr 17, 2023
fdf6c63
chore: Rename variables
lqiu96 Apr 17, 2023
64b2c77
chore: Increase the wait to 1s
lqiu96 Apr 17, 2023
c5c2f54
chore: Fix format issues
lqiu96 Apr 17, 2023
d0893ea
chore: Set closed var as volatile
lqiu96 Apr 17, 2023
1281ab5
chore: Update logic for onClose
lqiu96 Apr 17, 2023
1ae5d3d
chore: Attempt with longer timeout
lqiu96 Apr 17, 2023
36e3788
chore: Empty commit
lqiu96 Apr 17, 2023
452fc97
chore: Fix format issues
lqiu96 Apr 17, 2023
0ad9442
chore: Trigger deliver loop instead of notifyListeners
lqiu96 Apr 17, 2023
0ad7a4d
chore: Remove variable
lqiu96 Apr 17, 2023
a769ee1
chore: Remove variable
lqiu96 Apr 17, 2023
65e9e67
chore: Fix close logic
lqiu96 Apr 17, 2023
0b926d5
chore: Revert graalvm ci
lqiu96 Apr 18, 2023
6b70a50
chore: Use 2s as delay
lqiu96 Apr 18, 2023
9af022a
chore: Update to 5s delay
lqiu96 Apr 18, 2023
66c757c
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 18, 2023
fc59f98
chore: Add comments for timeout method
lqiu96 Apr 18, 2023
b773f89
chore: Use deliver loop in timeout
lqiu96 Apr 18, 2023
b571b6e
chore: Run matrix jobs sequentially
lqiu96 Apr 18, 2023
f97e7c0
chore: Fix format issues
lqiu96 Apr 18, 2023
1aec63c
chore: Fix format issues
lqiu96 Apr 18, 2023
de5927d
chore: Increase the wait to 10s
lqiu96 Apr 18, 2023
6a337c5
chore: Use 110ms delay
lqiu96 Apr 19, 2023
8be44c8
chore: Set delay to be 30s
lqiu96 Apr 19, 2023
42dd7ed
chore: Fix format issues
lqiu96 Apr 19, 2023
1de756c
chore: Log the onClose message
lqiu96 Apr 19, 2023
8756e27
chore: Remove localRunnable
lqiu96 Apr 19, 2023
51df7ea
chore: Fix format issues
lqiu96 Apr 19, 2023
a9ff512
chore: Lower the retry amounts
lqiu96 Apr 19, 2023
8f1627e
chore: Lower the retry amounts
lqiu96 Apr 19, 2023
becbc25
chore: Fix shouldRetry logic
lqiu96 Apr 19, 2023
c1f609f
chore: Log results of shouldRetry
lqiu96 Apr 19, 2023
be1f9fb
chore: Ignore other retry tests
lqiu96 Apr 19, 2023
490ccc1
chore: Add more logging
lqiu96 Apr 19, 2023
3a9bdfd
chore: Fix shouldRetry logic
lqiu96 Apr 19, 2023
87a41dc
chore: Remove small optimization
lqiu96 Apr 19, 2023
da26a56
chore: Temp ignore tests
lqiu96 Apr 19, 2023
0978549
chore: Temp ignore tests
lqiu96 Apr 19, 2023
93c8fdc
chore: Add more logging
lqiu96 Apr 19, 2023
2a9d2df
chore: revert back to checking for negative duration
lqiu96 Apr 19, 2023
a517d95
chore: Revert ignored test
lqiu96 Apr 19, 2023
0f8bbc8
chore: Fix logging
lqiu96 Apr 19, 2023
c1f914f
chore: Log timeout
lqiu96 Apr 19, 2023
6bb2d04
chore: Set min RPC timeout to be 1ms
lqiu96 Apr 19, 2023
3da7937
chore: Update the retry algorithms
lqiu96 Apr 19, 2023
d28b87f
chore: Clean up the algoritms
lqiu96 Apr 19, 2023
7c316a1
chore: Uncomment out ITRetry tests
lqiu96 Apr 19, 2023
ae3c2ec
chore: Refactor the retryAlgorithms
lqiu96 Apr 19, 2023
3fb78f7
chore: Add more comments
lqiu96 Apr 19, 2023
8cd3e3b
chore: Add in the parallel execution for ITs
lqiu96 Apr 19, 2023
b8704ff
chore: Add LRO showcase tests
lqiu96 Apr 20, 2023
eb6635a
chore: Fix format
lqiu96 Apr 20, 2023
0b76faf
chore: Remove deadline getters
lqiu96 Apr 20, 2023
acc3ee1
chore: Remove sonar changes
lqiu96 Apr 20, 2023
8b6445e
chore: Fix algorithm test
lqiu96 Apr 20, 2023
d458167
chore: Log the flaky test
lqiu96 Apr 20, 2023
09e7ff2
chore: Fix format
lqiu96 Apr 20, 2023
c20ee95
chore: Check for rpcTimeout being zero or negative
lqiu96 Apr 20, 2023
6552dbe
chore: Fix tests
lqiu96 Apr 20, 2023
0454e02
chore: Fix format issues
lqiu96 Apr 20, 2023
a1dcfdd
chore: Remove unused code
lqiu96 Apr 20, 2023
f9afed4
chore: Update comment for RetryAlgorithm
lqiu96 Apr 20, 2023
1d5c00a
chore: Fix format issues
lqiu96 Apr 20, 2023
a7983d3
chore: Use millis for timeout
lqiu96 Apr 21, 2023
15448d2
chore: Await termination for clients
lqiu96 Apr 21, 2023
924f7a0
chore: Fix format issues
lqiu96 Apr 21, 2023
f4ba8af
chore: Update LRO first call timeout value
lqiu96 Apr 21, 2023
a305123
chore: Update LRO test names
lqiu96 Apr 21, 2023
6e35172
chore: Remove the parallel showcase tests
lqiu96 Apr 21, 2023
d4bc792
chore: Add showcase sequence tests for retries
lqiu96 Apr 21, 2023
ccdf1e2
chore: Add showcase sequence tests for retries
lqiu96 Apr 21, 2023
561ffa3
chore: Update retry test name
lqiu96 Apr 24, 2023
3607a30
chore: Fix typos
lqiu96 Apr 24, 2023
cb34160
chore: Fix server streaming callable test
lqiu96 Apr 25, 2023
cf5ba07
chore: Clean up tests
lqiu96 Apr 25, 2023
07ecd7a
chore: Remove retry tests
lqiu96 Apr 25, 2023
b4812e3
chore: Fix format issues
lqiu96 Apr 25, 2023
ed6b53c
chore: Address PR comments
lqiu96 May 1, 2023
2095745
chore: Update java.time.Duration import
lqiu96 May 1, 2023
a8e3153
chore: Update values for LRO showcase test
lqiu96 May 1, 2023
760f5fe
chore: Update values for LRO showcase test
lqiu96 May 1, 2023
d890dee
chore: Fix format issues
lqiu96 May 1, 2023
3086821
chore: Update variable name
lqiu96 May 3, 2023
084b592
chore: Convert shouldRetry logic to use milliseconds
lqiu96 May 3, 2023
145d084
chore: Remove jitter from tests
lqiu96 May 3, 2023
e9c1645
chore: Fix showcase test
lqiu96 May 3, 2023
600a2dc
chore: Log the attempt callable timeout
lqiu96 May 3, 2023
5d81e85
chore: Update LRO test case
lqiu96 May 3, 2023
c429d96
chore: Add logging
lqiu96 May 3, 2023
5527700
chore: Use millis for timeout
lqiu96 May 4, 2023
13881bb
chore: Address PR comments
lqiu96 May 5, 2023
c052a87
chore: Update to use TestClientInitializer class
lqiu96 May 5, 2023
069acc6
Merge branch 'main' into main-showcase_retries
lqiu96 May 5, 2023
6cc15fb
chore: Fix client initialize method names
lqiu96 May 5, 2023
e099ac1
chore: Add back public method
lqiu96 May 8, 2023
0a3cadf
chore: Add back public methods
lqiu96 May 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.auto.value.AutoValue;
import com.google.protobuf.TypeRegistry;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
import org.threeten.bp.Instant;

/** Options for an http-json call, including deadline and credentials. */
Expand All @@ -45,6 +46,9 @@ public abstract class HttpJsonCallOptions {
@Nullable
public abstract Instant getDeadline();
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
public abstract Duration getTimeout();

@Nullable
public abstract Credentials getCredentials();

Expand All @@ -69,6 +73,11 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {
builder.setDeadline(newDeadline);
}

Duration newTimeout = inputOptions.getTimeout();
if (newTimeout != null) {
builder.setTimeout(newTimeout);
}

Credentials newCredentials = inputOptions.getCredentials();
if (newCredentials != null) {
builder.setCredentials(newCredentials);
Expand All @@ -86,6 +95,8 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {
public abstract static class Builder {
public abstract Builder setDeadline(Instant value);

public abstract Builder setTimeout(Duration value);

public abstract Builder setCredentials(Credentials value);

public abstract Builder setTypeRegistry(TypeRegistry value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType;
import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener;
import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult;
import com.google.api.gax.rpc.StatusCode;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -42,8 +43,12 @@
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* This class serves as main implementation of {@link HttpJsonClientCall} for REST transport and is
Expand Down Expand Up @@ -88,6 +93,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
private final ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor;
private final HttpTransport httpTransport;
private final Executor executor;
private final ScheduledExecutorService deadlineCancellationExecutor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to reuse the executor above? I know it's not a ScheduledExecutorService at this point, but the what's being passed in is from here, which is indeed a ScheduledExecutorService, so we should be able to change the type and reuse it. I did a quick search and I don't think most of the change would be breaking, there is one public builder method that could be considered breaking.
The reason we may want to reuse the existing executor is that we can leverage the existing client lifecycles for shutting down the executor, otherwise we need to explicitly shut down this new deadlineCancellationExecutor somewhere to prevent from resource leaking, which is missing from this PR currently. Reusing the existing executor may also have other issues like the ThreadPool is not large enough so new requests may have to wait for new thread. The bottom line is that we need to be careful with anything related to executors, so we have enough resource for large number of concurrent requests and we don't leak resource accidentally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, makes sense. I'll take another look into how grpc handles the multiple executors. I'd imagine they would also possibly run into this issue.


//
// Request-specific data (provided by client code) before we get a response.
Expand Down Expand Up @@ -128,6 +134,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
this.httpTransport = httpTransport;
this.executor = executor;
this.closed = false;
this.deadlineCancellationExecutor = Executors.newScheduledThreadPool(1);
}

@Override
Expand Down Expand Up @@ -161,6 +168,36 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
this.listener = responseListener;
this.requestHeaders = requestHeaders;
}

// Use the timeout duration value instead of calculating the future Instant
Duration timeout = callOptions.getTimeout();
if (timeout != null) {
// If the future timeout amount has been calculated as a negative value,
// we cancel the call immediately as the deadline has already been exceeded.
// The RetryAlgorithm should for this value and not run schedule a run
// if this is negative.
long timeoutNanos = 0;
if (!timeout.isNegative()) {
timeoutNanos = timeout.toNanos();
}
this.deadlineCancellationExecutor.schedule(
this::closeAndNotifyListeners, timeoutNanos, TimeUnit.NANOSECONDS);
}
}

// No need to trigger the deliver() loop again as we have already closed the runnable
// task and added the OnCloseNotificationTask. We notify the FutureListener that the
// there is a timeout exception from this RPC call (DEADLINE_EXCEEDED)
private void closeAndNotifyListeners() {
synchronized (lock) {
close(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment in close method makes me worried that we may still have to wait for the server to fully send the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the cancellation in the runnable doesn't seem like it's actually cancel's the runnable. I'll look into the possibility of being able to actually disconnect the connection instead of waiting for the socket timeout.

StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(),
"Deadline exceeded",
new HttpJsonStatusRuntimeException(
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), "Deadline exceeded", null),
true);
}
notifyListeners();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import com.google.api.gax.rpc.ApiCallContext;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;

/**
* {@code HttpJsonClientCalls} creates a new {@code HttpJsonClientCAll} from the given call context.
Expand All @@ -50,12 +48,16 @@ public static <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newC

HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault().nullToSelf(context);

// Try to convert the timeout into a deadline and use it if it occurs before the actual deadline
// Use the context's timeout instead of calculating a future deadline with the System clock.
// The timeout value is calculated from TimedAttemptSettings which accounts for the
// TotalTimeout value set in the RetrySettings.
if (httpJsonContext.getTimeout() != null) {
@Nonnull Instant newDeadline = Instant.now().plus(httpJsonContext.getTimeout());
HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions();
if (callOptions.getDeadline() == null || newDeadline.isBefore(callOptions.getDeadline())) {
callOptions = callOptions.toBuilder().setDeadline(newDeadline).build();
// HttpJsonChannel expects the HttpJsonCallOptions and we store the timeout duration
// inside the HttpJsonCallOptions
if (callOptions.getTimeout() == null
|| httpJsonContext.getTimeout().compareTo(callOptions.getTimeout()) < 0) {
callOptions = callOptions.toBuilder().setTimeout(httpJsonContext.getTimeout()).build();
httpJsonContext = httpJsonContext.withCallOptions(callOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/** A runnable object that creates and executes an HTTP request. */
class HttpRequestRunnable<RequestT, ResponseT> implements Runnable {
Expand Down Expand Up @@ -191,13 +190,27 @@ HttpRequest createHttpRequest() throws IOException {

HttpRequest httpRequest = buildRequest(requestFactory, url, jsonHttpContent);

Instant deadline = httpJsonCallOptions.getDeadline();
if (deadline != null) {
long readTimeout = Duration.between(Instant.now(), deadline).toMillis();
Duration timeout = httpJsonCallOptions.getTimeout();
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
if (timeout != null) {
long timeoutMs = timeout.toMillis();

// Read timeout is the timeout between reading two data packets and not total timeout
// HttpJsonClientCallsImpl implements a deadlineCancellationExecutor to cancel the
// RPC when it exceeds the RPC timeout
if (httpRequest.getReadTimeout() > 0
&& httpRequest.getReadTimeout() < readTimeout
&& readTimeout < Integer.MAX_VALUE) {
httpRequest.setReadTimeout((int) readTimeout);
&& httpRequest.getReadTimeout() < timeoutMs
&& timeoutMs < Integer.MAX_VALUE) {
httpRequest.setReadTimeout((int) timeoutMs);
}

// Connect timeout is the time allowed for establishing the connection.
// This is updated to match the RPC timeout as we do not want a shorter
// connect timeout to preemptively throw a ConnectExcepetion before
// we've reached the RPC timeout
if (httpRequest.getConnectTimeout() > 0
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
&& httpRequest.getConnectTimeout() < timeoutMs
&& timeoutMs < Integer.MAX_VALUE) {
httpRequest.setConnectTimeout((int) timeoutMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ public void testSuccessfulMultipleResponsesForUnaryCall()
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));

Field request = createTestMessage(2);
Field expectedResponse = createTestMessage(2);
Expand Down Expand Up @@ -199,7 +202,10 @@ public void testErrorMultipleResponsesForUnaryCall()
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));

Field request = createTestMessage(2);
Field expectedResponse = createTestMessage(2);
Expand Down Expand Up @@ -228,7 +234,10 @@ public void testErrorUnaryResponse() throws InterruptedException {
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));

ApiException exception =
ApiExceptionFactory.createException(
Expand Down Expand Up @@ -257,7 +266,10 @@ public void testErrorNullContentSuccessfulResponse() throws InterruptedException
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));

MOCK_SERVICE.addNullResponse();

Expand All @@ -283,7 +295,10 @@ public void testErrorNullContentFailedResponse() throws InterruptedException {
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));
MOCK_SERVICE.addNullResponse(400);

try {
Expand All @@ -306,7 +321,10 @@ public void testErrorNon2xxOr4xxResponse() throws InterruptedException {
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel);
HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault()
.withChannel(channel)
.withTimeout(Duration.ofSeconds(30));

ApiException exception =
ApiExceptionFactory.createException(
Expand All @@ -323,6 +341,34 @@ public void testErrorNon2xxOr4xxResponse() throws InterruptedException {
}
}

/**
* Expectation is that an RPC that exceeds the Timeout value set will receive a DEADLINE_EXCEEDED
* response back. In this test, the call has a timeout value that is smaller than the time it
* takes for the mock service to return a response.
*
* @throws InterruptedException
*/
@Test
public void testDeadlineExceededResponse() throws InterruptedException {
HttpJsonDirectCallable<Field, Field> callable =
new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR);

HttpJsonCallContext callContext =
HttpJsonCallContext.createDefault().withChannel(channel).withTimeout(Duration.ofSeconds(3));

Field response = createTestMessage(10);
MOCK_SERVICE.addResponse(response, Duration.ofSeconds(5));

try {
callable.futureCall(createTestMessage(10), callContext).get();
Assert.fail("No exception raised");
} catch (ExecutionException e) {
HttpJsonStatusRuntimeException respExp = (HttpJsonStatusRuntimeException) e.getCause();
assertThat(respExp.getStatusCode()).isEqualTo(504);
assertThat(respExp.getMessage()).isEqualTo("Deadline exceeded");
}
}

private Field createTestMessage(int number) {
return Field.newBuilder() // "echo" service
.setName("john/imTheBestField")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.api.gax.httpjson.testing.MockHttpService;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class HttpJsonDirectServerStreamingCallableTest {
Expand Down Expand Up @@ -144,7 +146,9 @@ public void setUp() {
clientContext =
ClientContext.newBuilder()
.setTransportChannel(HttpJsonTransportChannel.create(channel))
.setDefaultCallContext(HttpJsonCallContext.of(channel, HttpJsonCallOptions.DEFAULT))
.setDefaultCallContext(
HttpJsonCallContext.of(channel, HttpJsonCallOptions.DEFAULT)
.withTimeout(Duration.ofSeconds(3)))
.build();
streamingCallSettings = ServerStreamingCallSettings.<Color, Money>newBuilder().build();
streamingCallable =
Expand Down Expand Up @@ -326,6 +330,26 @@ public void testBlockingServerStreaming() {
Truth.assertThat(responseData).containsExactly(expected);
}

// This test ensures that the server-side streaming does not exceed the timeout value
@Test
public void testDeadlineExceededServerStreaming() throws InterruptedException {
MOCK_SERVICE.addResponse(
new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE}, Duration.ofSeconds(5));
Color request = Color.newBuilder().setRed(0.5f).build();
CountDownLatch latch = new CountDownLatch(1);
MoneyObserver moneyObserver = new MoneyObserver(false, latch);

streamingCallable.call(request, moneyObserver);

moneyObserver.controller.request(2);
// Set the latch's await time to above the context's timeout value to ensure that
// the latch has been released.
Truth.assertThat(latch.await(5000, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.error).isInstanceOf(DeadlineExceededException.class);
Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("Deadline exceeded");
}

static class MoneyObserver extends StateCheckingResponseObserver<Money> {
private final boolean autoFlowControl;
private final CountDownLatch latch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.threeten.bp.Duration;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Mocks an HTTPTransport. Expected responses and exceptions can be added to a queue from which this
Expand Down Expand Up @@ -87,6 +88,11 @@ public synchronized void addResponse(Object response) {
responseHandlers.add(new MessageResponseFactory(endpoint, serviceMethodDescriptors, response));
}

public synchronized void addResponse(Object response, Duration delay) {
responseHandlers.add(
new MessageResponseFactory(endpoint, serviceMethodDescriptors, response, delay));
}

/** Add an expected null response (empty HTTP response body) with a custom status code. */
public synchronized void addNullResponse(int statusCode) {
responseHandlers.add(
Expand Down Expand Up @@ -182,16 +188,33 @@ private static class MessageResponseFactory implements HttpResponseFactory {
private final List<ApiMethodDescriptor> serviceMethodDescriptors;
private final Object response;
private final String endpoint;
private final Duration delay;

public MessageResponseFactory(
String endpoint, List<ApiMethodDescriptor> serviceMethodDescriptors, Object response) {
this(endpoint, serviceMethodDescriptors, response, Duration.ofNanos(0));
}

public MessageResponseFactory(
String endpoint,
List<ApiMethodDescriptor> serviceMethodDescriptors,
Object response,
Duration delay) {
this.endpoint = endpoint;
this.serviceMethodDescriptors = ImmutableList.copyOf(serviceMethodDescriptors);
this.response = response;
this.delay = delay;
}

@Override
public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String fullTargetUrl) {
// We use Thread.sleep to mimic a long server response. Most tests should not
// require a sleep and can return a response immediately.
try {
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse();

String relativePath = getRelativePath(fullTargetUrl);
Expand Down
Loading