Skip to content

Commit

Permalink
client contexts are cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 committed Aug 9, 2024
1 parent 9a29b39 commit 55e6b4a
Show file tree
Hide file tree
Showing 25 changed files with 320 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,17 @@ public AbstractClientOptionsBuilder contextCustomizer(
return this;
}

/**
* Sets the {@link ResponseTimeoutMode} which determines when a {@link #responseTimeout(Duration)}}
* will start to be scheduled.
* @see ResponseTimeoutMode
*/
@UnstableApi
public AbstractClientOptionsBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return option(ClientOptions.RESPONSE_TIMEOUT_MODE,
requireNonNull(responseTimeoutMode, "responseTimeoutMode"));
}

/**
* Builds {@link ClientOptions} with the given options and the
* {@linkplain ClientOptions#of() default options}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ final boolean tryInitialize() {
final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
scheduler.updateTask(newCancellationTask());
if (ctx.responseTimeoutMode() == ResponseTimeoutMode.REQUEST_WRITE) {
scheduler.start();
}
}
if (ctx.isCancelled()) {
// The previous cancellation task wraps the cause with an UnprocessedRequestException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,9 @@ public ClientBuilder contextCustomizer(
public ClientBuilder contextHook(Supplier<? extends AutoCloseable> contextHook) {
return (ClientBuilder) super.contextHook(contextHook);
}

@Override
public ClientBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return (ClientBuilder) super.responseTimeoutMode(responseTimeoutMode);
}
}
14 changes: 14 additions & 0 deletions core/src/main/java/com/linecorp/armeria/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public final class ClientOptions
public static final ClientOption<Supplier<? extends AutoCloseable>> CONTEXT_HOOK =
ClientOption.define("CONTEXT_HOOK", NOOP_CONTEXT_HOOK);

@UnstableApi
public static final ClientOption<ResponseTimeoutMode> RESPONSE_TIMEOUT_MODE =
ClientOption.define("RESPONSE_TIMEOUT_MODE", Flags.responseTimeoutMode());

private static final List<AsciiString> PROHIBITED_HEADER_NAMES = ImmutableList.of(
HttpHeaderNames.HTTP2_SETTINGS,
HttpHeaderNames.METHOD,
Expand Down Expand Up @@ -395,6 +399,16 @@ public Supplier<AutoCloseable> contextHook() {
return (Supplier<AutoCloseable>) get(CONTEXT_HOOK);
}

/**
* Returns the {@link ResponseTimeoutMode} which determines when a {@link #responseTimeoutMillis()}
* will start to be scheduled.
* @see ResponseTimeoutMode
*/
@UnstableApi
public ResponseTimeoutMode responseTimeoutMode() {
return get(RESPONSE_TIMEOUT_MODE);
}

/**
* Returns a new {@link ClientOptionsBuilder} created from this {@link ClientOptions}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,9 @@ public ClientOptionsBuilder contextCustomizer(
public ClientOptionsBuilder contextHook(Supplier<? extends AutoCloseable> contextHook) {
return (ClientOptionsBuilder) super.contextHook(contextHook);
}

@Override
public ClientOptionsBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return (ClientOptionsBuilder) super.responseTimeoutMode(responseTimeoutMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,14 @@ default boolean isTimedOut() {
@UnstableApi
ExchangeType exchangeType();

/**
* Returns the {@link ResponseTimeoutMode} which determines when a {@link #responseTimeoutMillis()}
* will start to be scheduled.
* @see ResponseTimeoutMode
*/
@UnstableApi
ResponseTimeoutMode responseTimeoutMode();

@Override
default ClientRequestContext unwrap() {
return (ClientRequestContext) RequestContext.super.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public ExchangeType exchangeType() {
return unwrap().exchangeType();
}

@Override
public ResponseTimeoutMode responseTimeoutMode() {
return unwrap().responseTimeoutMode();
}

@Override
public void hook(Supplier<? extends AutoCloseable> contextHook) {
unwrap().hook(contextHook);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ void initTimeout() {
final CancellationScheduler responseCancellationScheduler =
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.updateTask(newCancellationTask());
responseCancellationScheduler.start();
if (ctx.responseTimeoutMode() == ResponseTimeoutMode.RESPONSE_READ) {
responseCancellationScheduler.start();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* Specifies when to start scheduling a response timeout. Note that this value does not affect calculation
* of the response timeout, but determines when the response timeout will affect the ongoing request.
*
* <p>For example, assume the following scenario with a responseTimeout of 2 seconds:
* <pre>{@code
* |---request start---decorators(3s)---connection acquisition(2s)---request write/response read---|
* }</pre>
* <ul>
* <li>
* If {@link ResponseTimeoutMode#RESPONSE_READ} is used, the request will go through the decorators,
* acquire a connection, and write the request. Once the response header is read, the timeout task
* will trigger immediately since 5 seconds has passed which exceeds the responseTimeout of 2 seconds.
* </li>
* <li>
* If {@link ResponseTimeoutMode#REQUEST_START} is used, the timeout task will be scheduled
* immediately on request start. The timeout task will trigger while the request goes through
* the decorator, and the request will fail before acquiring a new connection.
* </li>
* </ul>
*/
@UnstableApi
public enum ResponseTimeoutMode {

/**
* The response timeout is scheduled when the request first starts to execute. More specifically,
* the scheduling will take place when the request starts to go through the decorator chain.
*/
REQUEST_START,

/**
* The response timeout is scheduled when the request is first written on the wire.
*/
REQUEST_WRITE,

/**
* The response timeout is scheduled either when the response bytes are first read, or when the client
* finishes writing the request.
*/
RESPONSE_READ,
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,9 @@ public RestClientBuilder contextCustomizer(
public RestClientBuilder contextHook(Supplier<? extends AutoCloseable> contextHook) {
return (RestClientBuilder) super.contextHook(contextHook);
}

@Override
public RestClientBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return (RestClientBuilder) super.responseTimeoutMode(responseTimeoutMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,9 @@ public WebClientBuilder contextCustomizer(
Consumer<? super ClientRequestContext> contextCustomizer) {
return (WebClientBuilder) super.contextCustomizer(contextCustomizer);
}

@Override
public WebClientBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return (WebClientBuilder) super.responseTimeoutMode(responseTimeoutMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.linecorp.armeria.client.DecoratingRpcClientFunction;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.RpcClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
Expand Down Expand Up @@ -390,4 +391,9 @@ public WebSocketClientBuilder contextCustomizer(
public WebSocketClientBuilder contextHook(Supplier<? extends AutoCloseable> contextHook) {
return (WebSocketClientBuilder) super.contextHook(contextHook);
}

@Override
public WebSocketClientBuilder responseTimeoutMode(ResponseTimeoutMode responseTimeoutMode) {
return (WebSocketClientBuilder) super.responseTimeoutMode(responseTimeoutMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;

import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.common.util.Sampler;
import com.linecorp.armeria.common.util.TlsEngineType;
import com.linecorp.armeria.common.util.TransportType;
Expand Down Expand Up @@ -100,6 +101,7 @@ final class DefaultFlagsProvider implements FlagsProvider {
static final String DNS_CACHE_SPEC = "maximumSize=4096";
static final long DEFAULT_UNLOGGED_EXCEPTIONS_REPORT_INTERVAL_MILLIS = 10000;
static final long DEFAULT_HTTP1_CONNECTION_CLOSE_DELAY_MILLIS = 3000;
static final ResponseTimeoutMode DEFAULT_RESPONSE_TIMEOUT_MODE = ResponseTimeoutMode.RESPONSE_READ;

private DefaultFlagsProvider() {}

Expand Down Expand Up @@ -511,4 +513,9 @@ public DistributionStatisticConfig distributionStatisticConfig() {
public Long defaultHttp1ConnectionCloseDelayMillis() {
return DEFAULT_HTTP1_CONNECTION_CLOSE_DELAY_MILLIS;
}

@Override
public ResponseTimeoutMode responseTimeoutMode() {
return DEFAULT_RESPONSE_TIMEOUT_MODE;
}
}
18 changes: 18 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.client.DnsResolverGroupBuilder;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.client.retry.RetryingClient;
import com.linecorp.armeria.client.retry.RetryingRpcClient;
Expand Down Expand Up @@ -439,6 +440,9 @@ private static boolean validateTransportType(TransportType transportType, String
getValue(FlagsProvider::defaultHttp1ConnectionCloseDelayMillis,
"defaultHttp1ConnectionCloseDelayMillis", value -> value >= 0);

private static final ResponseTimeoutMode RESPONSE_TIMEOUT_MODE =
getValue(FlagsProvider::responseTimeoutMode, "responseTimeoutMode");

/**
* Returns the specification of the {@link Sampler} that determines whether to retain the stack
* trace of the exceptions that are thrown frequently by Armeria. A sampled exception will have the stack
Expand Down Expand Up @@ -1642,6 +1646,20 @@ public static long defaultHttp1ConnectionCloseDelayMillis() {
return DEFAULT_HTTP1_CONNECTION_CLOSE_DELAY_MILLIS;
}

/**
* Returns the {@link ResponseTimeoutMode} which determines when a response timeout
* will start to be scheduled.
*
* <p>The default value of this flag is RESPONSE_READ. Specify the
* {@code -Dcom.linecorp.armeria.responseTimeoutMode=ResponseTimeoutMode} JVM option to
* override the default value.
* @see ResponseTimeoutMode
*/
@UnstableApi
public static ResponseTimeoutMode responseTimeoutMode() {
return RESPONSE_TIMEOUT_MODE;
}

@Nullable
private static String nullableCaffeineSpec(Function<FlagsProvider, String> method, String flagName) {
return caffeineSpec(method, flagName, true);
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/FlagsProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.client.ClientBuilder;
import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.client.DnsResolverGroupBuilder;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.client.retry.RetryingClient;
import com.linecorp.armeria.client.retry.RetryingRpcClient;
Expand Down Expand Up @@ -1232,4 +1233,19 @@ default DistributionStatisticConfig distributionStatisticConfig() {
default Long defaultHttp1ConnectionCloseDelayMillis() {
return null;
}

/**
* Returns the {@link ResponseTimeoutMode} which determines when a response timeout
* will start to be scheduled.
*
* <p>The default value of this flag is RESPONSE_READ. Specify the
* {@code -Dcom.linecorp.armeria.responseTimeoutMode=ResponseTimeoutMode} JVM option to
* override the default value.
* @see ResponseTimeoutMode
*/
@Nullable
@UnstableApi
default ResponseTimeoutMode responseTimeoutMode() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;

import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.InetAddressPredicates;
import com.linecorp.armeria.common.util.Sampler;
Expand Down Expand Up @@ -598,6 +599,12 @@ public Long defaultUnloggedExceptionsReportIntervalMillis() {
return getLong("defaultUnloggedExceptionsReportIntervalMillis");
}

@Override
@Nullable
public ResponseTimeoutMode responseTimeoutMode() {
return getAndParse("responseTimeoutMode", ResponseTimeoutMode::valueOf);
}

@Nullable
private static Long getLong(String name) {
return getAndParse(name, Long::parseLong);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ private void updateFlags(int flags) {
private static void completeSatisfiedFutures(RequestLogFuture[] satisfiedFutures, RequestLog log,
RequestContext ctx) {
if (!ctx.eventLoop().inEventLoop()) {
ctx.eventLoop().execute(() -> completeSatisfiedFutures(satisfiedFutures, log, ctx));
ctx.eventLoop().withoutContext().execute(
() -> completeSatisfiedFutures(satisfiedFutures, log, ctx));
return;
}
for (RequestLogFuture f : satisfiedFutures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.RequestOptions;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.UnprocessedRequestException;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.AttributesGetters;
Expand Down Expand Up @@ -569,8 +570,12 @@ private void initializeResponseCancellationScheduler() {
log.endResponse(cause);
}
};
responseCancellationScheduler.init(eventLoop().withoutContext());
responseCancellationScheduler.updateTask(cancellationTask);
if (options.responseTimeoutMode() == ResponseTimeoutMode.REQUEST_START) {
responseCancellationScheduler.initAndStart(eventLoop().withoutContext(), cancellationTask);
} else {
responseCancellationScheduler.init(eventLoop().withoutContext());
responseCancellationScheduler.updateTask(cancellationTask);
}
}

@Nullable
Expand Down Expand Up @@ -1053,4 +1058,9 @@ public CompletableFuture<Void> initiateConnectionShutdown() {
});
return completableFuture;
}

@Override
public ResponseTimeoutMode responseTimeoutMode() {
return options.responseTimeoutMode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ void cancel_beforeDelegate(TestInfo testInfo) {
.hasRootCause(t);
assertThat(connListener.opened()).isEqualTo(0);
assertThat(requests).doesNotContain(testInfo.getDisplayName());
// don't validate the thread since we haven't started with event loop scheduling yet
validateCallbackChecks(null);
}
}

Expand Down Expand Up @@ -149,6 +151,8 @@ void cancel_beforeConnection(TestInfo testInfo) {
.hasCauseInstanceOf(UnprocessedRequestException.class)
.hasRootCause(t);
assertThat(requests).doesNotContain(testInfo.getDisplayName());
// don't validate the thread since we haven't started with event loop scheduling yet
validateCallbackChecks(null);
}
}

Expand Down
Loading

0 comments on commit 55e6b4a

Please sign in to comment.