Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Cancel the Timeout Task for HttpJson #2360

Merged
merged 31 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f04a7f
fix: Cancel the Timeout Task for HttpJson
lqiu96 Jan 17, 2024
e6f7219
chore: Use HttpJsonClientCallImpl in main branch
lqiu96 Jan 17, 2024
8c466ec
chore: Use gRPC client for gRPC test
lqiu96 Jan 17, 2024
e226b4a
Revert "chore: Use HttpJsonClientCallImpl in main branch"
lqiu96 Jan 17, 2024
90c9c53
chore: Clean up ITClientShutdown
lqiu96 Jan 17, 2024
1899391
Merge remote-tracking branch 'origin/main' into cancel-timeout-task
lqiu96 Jan 29, 2024
da2d1db
chore: Add docs for terminating the client
lqiu96 Jan 29, 2024
108dd47
chore: Remove duplicate isTerminated check
lqiu96 Feb 5, 2024
b0886f1
chore: Fix type
lqiu96 Feb 6, 2024
5038710
chore: Add unit tests for HttpJsonClientCallsImpl
lqiu96 Feb 6, 2024
6871fe1
chore: Add countdownlatch
lqiu96 Feb 6, 2024
37abc91
chore: Add extremely large timeout
lqiu96 Feb 6, 2024
8e61ffb
chore: Use getTaskCount for number of scheduled tasks
lqiu96 Feb 6, 2024
f252de7
chore: Update showcase
lqiu96 Feb 6, 2024
358236c
chore: Create ScheduleExecutor in each test
lqiu96 Feb 7, 2024
83c7875
chore: Remove flaky assertions
lqiu96 Feb 7, 2024
1932a75
Merge remote-tracking branch 'origin/main' into cancel-timeout-task
lqiu96 Feb 7, 2024
df5c4a8
chore: Remove flaky getActiveTask() assertions
lqiu96 Feb 7, 2024
ee71349
Merge branch 'main' into cancel-timeout-task
lqiu96 Feb 7, 2024
0799b85
chore: Update tests
lqiu96 Feb 7, 2024
d6d084a
chore: Update docs
lqiu96 Feb 7, 2024
34f6e03
chore: Fix sonar bugs
lqiu96 Feb 7, 2024
50fab95
chore: Fix lint issues
lqiu96 Feb 7, 2024
37fab75
Merge branch 'main' into cancel-timeout-task
lqiu96 Feb 7, 2024
28c1861
chore: Add comment about future being null
lqiu96 Feb 8, 2024
83f9f80
chore: Increase test timeout
lqiu96 Feb 8, 2024
fbe59f0
Merge branch 'main' into cancel-timeout-task
lqiu96 Feb 8, 2024
25c5d0d
chore: Fix typo
lqiu96 Feb 8, 2024
4e58707
Merge branch 'main' into cancel-timeout-task
lqiu96 Feb 13, 2024
746c0c2
chore: Refactor showcase tests
lqiu96 Feb 13, 2024
7aa1f35
Merge branch 'main' into cancel-timeout-task
lqiu96 Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -121,6 +122,12 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
@GuardedBy("lock")
private volatile boolean closed;

// Store the timeout future created by the deadline schedule executor. The future
// can be cancelled if a response (either an error or valid payload) has been
// received before the timeout.
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
@GuardedBy("lock")
private ScheduledFuture<?> timeoutFuture;

HttpJsonClientCallImpl(
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor,
String endpoint,
Expand Down Expand Up @@ -167,16 +174,20 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
Preconditions.checkState(this.listener == null, "The call is already started");
this.listener = responseListener;
this.requestHeaders = requestHeaders;
}

// Use the timeout duration value instead of calculating the future Instant
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
Duration timeout = callOptions.getTimeout();
if (timeout != null) {
// The future timeout value is guaranteed to not be a negative value as the
// RetryAlgorithm will not retry
long timeoutMs = timeout.toMillis();
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
// Use the timeout duration value instead of calculating the future Instant
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
Duration timeout = callOptions.getTimeout();
if (timeout != null) {
// The future timeout value is guaranteed to not be a negative value as the
// RetryAlgorithm will not retry
long timeoutMs = timeout.toMillis();
// Assign the scheduled future so that it can be cancelled if the timeout task
// is not needed (response received prior to timeout)
timeoutFuture =
this.deadlineCancellationExecutor.schedule(
this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
}
}
}

Expand Down Expand Up @@ -430,6 +441,15 @@ private void close(
return;
}
closed = true;

// Cancel the timeout future if there is a timeout associated with the RPC
if (timeoutFuture != null) {
// The timeout method also invokes close(), but cancelling a completed task should no-op.
// Attempt to interrupt the future as the client should not wait for the timeout to complete
timeoutFuture.cancel(true);
timeoutFuture = null;
}

// Best effort task cancellation (to not be confused with task's thread interruption).
// If the task is in blocking I/O waiting for the server response, it will keep waiting for
// the response from the server, but once response is received the task will exit silently.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.showcase.v1beta1.it;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.common.collect.ImmutableSet;
import com.google.common.truth.Truth;
import com.google.showcase.v1beta1.BlockRequest;
import com.google.showcase.v1beta1.BlockResponse;
import com.google.showcase.v1beta1.EchoClient;
import com.google.showcase.v1beta1.EchoRequest;
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.threeten.bp.Duration;

public class ITClientShutdown {

// Test to ensure the client can close + terminate properly
@Test
public void testGrpc_closeClient() throws Exception {
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();
grpcClient.close();
// 500ms buffer time to properly terminate the client
grpcClient.awaitTermination(500, TimeUnit.MILLISECONDS);
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
Truth.assertThat(grpcClient.isShutdown()).isTrue();
Truth.assertThat(grpcClient.isTerminated()).isTrue();
}

// Test to ensure the client can close + terminate properly
@Test
public void testHttpJson_closeClient() throws Exception {
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();
httpjsonClient.close();
// 500ms buffer time to properly terminate the client
httpjsonClient.awaitTermination(500, TimeUnit.MILLISECONDS);
Truth.assertThat(httpjsonClient.isShutdown()).isTrue();
Truth.assertThat(httpjsonClient.isTerminated()).isTrue();
}

// Test to ensure hte client can close + terminate after a quick RPC invocation
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testGrpc_rpcInvoked_closeClient() throws Exception {
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();

grpcClient.echo(EchoRequest.newBuilder().setContent("Test").build());

grpcClient.close();
// 1s buffer time to properly terminate the client after RPC is invoked
grpcClient.awaitTermination(1, TimeUnit.SECONDS);
Truth.assertThat(grpcClient.isShutdown()).isTrue();
Truth.assertThat(grpcClient.isTerminated()).isTrue();
}

// Test to ensure hte client can close + terminate after a quick RPC invocation
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testHttpJson_rpcInvoked_closeClient() throws Exception {
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();

httpjsonClient.echo(EchoRequest.newBuilder().setContent("Test").build());

httpjsonClient.close();
// 1s buffer time to properly terminate the client after RPC is invoked
httpjsonClient.awaitTermination(1, TimeUnit.SECONDS);
Truth.assertThat(httpjsonClient.isShutdown()).isTrue();
Truth.assertThat(httpjsonClient.isTerminated()).isTrue();
}

// This test is to ensure that the client is able to close + terminate any resources
// once a response has been received. Set a max test duration of 15s to ensure that
// the test does not continue on forever.
@Test(timeout = 15000L)
public void testGrpc_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
throws Exception {
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
// invocation should time out in 15s, but the client will receive a response in 2s.
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
// received so the client can properly terminate.
RetrySettings defaultRetrySettings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(15000L))
.setMaxRpcTimeout(Duration.ofMillis(15000L))
.setTotalTimeout(Duration.ofMillis(15000L))
.setMaxAttempts(1)
.build();
EchoClient grpcClient =
TestClientInitializer.createGrpcEchoClientCustomBlockSettings(
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));

BlockRequest blockRequest =
BlockRequest.newBuilder()
.setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_2sDelay"))
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
.build();

long start = System.currentTimeMillis();
BlockResponse response = grpcClient.block(blockRequest);
Truth.assertThat(response.getContent()).isEqualTo("gRPCBlockContent_2sDelay");

// Intentionally do not run grpcClient.awaitTermination(...) as this test will
// check that everything is properly terminated after close() is called.
grpcClient.close();

busyWaitUntilClientTermination(grpcClient);
long end = System.currentTimeMillis();

Truth.assertThat(grpcClient.isShutdown()).isTrue();

// Check the termination time. If all the tasks/ resources are closed successfully,
// the termination time should only take about 2s (time to receive a response) + time
// to close the client. Check that this takes less than 5s (2s request time + 3s
// buffer time).
long terminationTime = end - start;
Truth.assertThat(terminationTime).isLessThan(5000L);
}

// This test is to ensure that the client is able to close + terminate any resources
// once a response has been received. Set a max test duration of 15s to ensure that
// the test does not continue on forever.
@Test(timeout = 15000L)
public void testHttpJson_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
throws Exception {
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
// invocation should time out in 15s, but the client will receive a response in 2s.
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
// received so the client can properly terminate.
RetrySettings defaultRetrySettings =
RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofMillis(15000L))
.setMaxRpcTimeout(Duration.ofMillis(15000L))
.setTotalTimeout(Duration.ofMillis(15000L))
.setMaxAttempts(1)
.build();
EchoClient httpjsonClient =
TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings(
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));

BlockRequest blockRequest =
BlockRequest.newBuilder()
.setSuccess(BlockResponse.newBuilder().setContent("httpjsonBlockContent_2sDelay"))
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
.build();

long start = System.currentTimeMillis();
BlockResponse response = httpjsonClient.block(blockRequest);
Truth.assertThat(response.getContent()).isEqualTo("httpjsonBlockContent_2sDelay");

// Intentionally do not run grpcClient.awaitTermination(...) as this test will
// check that everything is properly terminated after close() is called.
httpjsonClient.close();

busyWaitUntilClientTermination(httpjsonClient);
long end = System.currentTimeMillis();

Truth.assertThat(httpjsonClient.isShutdown()).isTrue();

// Check the termination time. If all the tasks/ resources are closed successfully,
// the termination time should only take about 2s (time to receive a response) + time
// to close the client. Check that this takes less than 5s (2s request time + 3s
// buffer time).
long terminationTime = end - start;
Truth.assertThat(terminationTime).isLessThan(5000L);
}

// Loop until the client has terminated successfully. For tests that use this,
// try to ensure there is a timeout associated, otherwise this may run forever.
// Future enhancement: Use awaitility instead of busy waiting
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
private static void busyWaitUntilClientTermination(EchoClient client)
throws InterruptedException {
while (!client.isTerminated()) {
Thread.sleep(500L);
}
}
}
Loading